Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine the GC interface by PageEntriesEdit #3879

Merged
merged 6 commits into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 19 additions & 26 deletions dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <Storages/Page/V3/BlobStore.h>
#include <Storages/Page/V3/PageEntriesEdit.h>

#include <ext/scope_guard.h>
#include <mutex>

namespace ProfileEvents
{
Expand Down Expand Up @@ -90,7 +92,6 @@ PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & wr
switch (write.type)
{
case WriteBatch::WriteType::PUT:
case WriteBatch::WriteType::UPSERT:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why removed UPSERT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After digging into current gc routine implementation, we only use PageEntriesEdit:: upsertPage in BlobStore::gc (with version set in the same time). WriteBatch::upsert is not a valid input.
https://github.com/pingcap/tics/blob/ded8a062b9f777ccfd70087f6341feac725bcb08/dbms/src/Storages/Page/V3/BlobStore.cpp#L406

{
ChecksumClass digest;
PageEntryV3 entry;
Expand Down Expand Up @@ -124,16 +125,7 @@ PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & wr
}

buffer_pos += write.size;

if (write.type == WriteBatch::WriteType::PUT)
{
edit.put(write.page_id, entry);
}
else // WriteBatch::WriteType::UPSERT
{
edit.upsertPage(write.page_id, entry);
}

edit.put(write.page_id, entry);
break;
}
case WriteBatch::WriteType::DEL:
Expand All @@ -146,6 +138,8 @@ PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & wr
edit.ref(write.page_id, write.ori_page_id);
break;
}
default:
throw Exception(fmt::format("Unknown write type: {}", write.type));
}
}

Expand Down Expand Up @@ -192,12 +186,12 @@ std::pair<BlobFileId, BlobFileOffset> BlobStore::getPosFromStats(size_t size)
{
auto lock_stats = blob_stats.lock();
BlobFileId blob_file_id = INVALID_BLOBFILE_ID;
std::tie(stat, blob_file_id) = blob_stats.chooseStat(size, config.file_limit_size);
std::tie(stat, blob_file_id) = blob_stats.chooseStat(size, config.file_limit_size, lock_stats);

// No valid stat for puting data with `size`, create a new one
if (stat == nullptr)
{
stat = blob_stats.createStat(blob_file_id);
stat = blob_stats.createStat(blob_file_id, lock_stats);
}
}

Expand Down Expand Up @@ -362,13 +356,11 @@ std::vector<BlobFileId> BlobStore::getGCStats()
return blob_need_gc;
}

PageIdAndVersionedEntryList BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> & entries_need_gc,
const PageSize & total_page_size,
const WriteLimiterPtr & write_limiter,
const ReadLimiterPtr & read_limiter)
PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> & entries_need_gc,
const PageSize & total_page_size,
const WriteLimiterPtr & write_limiter,
const ReadLimiterPtr & read_limiter)
{
PageIdAndVersionedEntryList copy_list;

PageEntriesEdit edit;

if (total_page_size == 0)
Expand Down Expand Up @@ -407,12 +399,12 @@ PageIdAndVersionedEntryList BlobStore::gc(std::map<BlobFileId, PageIdAndVersione
new_entry.size = entry.size;

new_entry.file_id = blobfile_id;
new_entry.offset = offset_in_data;
new_entry.offset = offset_in_data; // FIXME: offset_in_file + offset_in_data?
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved

offset_in_data += new_entry.size;
data_pos += new_entry.size;

copy_list.emplace_back(std::make_tuple(page_id, std::move(versioned), std::move(new_entry)));
edit.upsertPage(page_id, versioned, new_entry);
}
}
}
Expand All @@ -438,7 +430,7 @@ PageIdAndVersionedEntryList BlobStore::gc(std::map<BlobFileId, PageIdAndVersione
throw e;
}

return copy_list;
return edit;
}

String BlobStore::getBlobFilePath(BlobFileId blob_id) const
Expand All @@ -460,13 +452,13 @@ BlobStore::BlobStats::BlobStats(Poco::Logger * log_, BlobStore::Config config_)
{
}

std::lock_guard<std::mutex> BlobStore::BlobStats::lock()
std::lock_guard<std::mutex> BlobStore::BlobStats::lock() const
{
return std::lock_guard(lock_stats);
}


BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id)
BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id, const std::lock_guard<std::mutex> &)
{
BlobStatPtr stat = nullptr;

Expand Down Expand Up @@ -508,7 +500,7 @@ BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id)
return stat;
}

void BlobStore::BlobStats::eraseStat(BlobFileId blob_file_id)
void BlobStore::BlobStats::eraseStat(BlobFileId blob_file_id, const std::lock_guard<std::mutex> &)
{
BlobStatPtr stat = nullptr;
bool found = false;
Expand Down Expand Up @@ -553,7 +545,7 @@ BlobFileId BlobStore::BlobStats::chooseNewStat()
return rv;
}

std::pair<BlobStatPtr, BlobFileId> BlobStore::BlobStats::chooseStat(size_t buf_size, UInt64 file_limit_size)
std::pair<BlobStatPtr, BlobFileId> BlobStore::BlobStats::chooseStat(size_t buf_size, UInt64 file_limit_size, const std::lock_guard<std::mutex> &)
{
BlobStatPtr stat_ptr = nullptr;
double smallest_valid_rate = 2;
Expand Down Expand Up @@ -638,6 +630,7 @@ void BlobStore::BlobStats::BlobStat::removePosFromStat(BlobFileOffset offset, si

BlobStatPtr BlobStore::BlobStats::fileIdToStat(BlobFileId file_id)
{
auto guard = lock();
for (auto & stat : stats_map)
{
if (stat->id == file_id)
Expand Down
22 changes: 11 additions & 11 deletions dbms/src/Storages/Page/V3/BlobStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ extern const int LOGICAL_ERROR;

namespace PS::V3
{
using PageIdAndVersionedEntryList = std::vector<std::tuple<PageId, PageVersionType, PageEntryV3>>;
using PageIdAndVersionedEntries = std::vector<std::pair<PageId, VersionedEntries>>;

class BlobStore : public Allocator<false>
Expand Down Expand Up @@ -100,11 +99,11 @@ class BlobStore : public Allocator<false>
public:
BlobStats(Poco::Logger * log_, BlobStore::Config config);

std::lock_guard<std::mutex> lock();
std::lock_guard<std::mutex> lock() const;

BlobStatPtr createStat(BlobFileId blob_file_id);
BlobStatPtr createStat(BlobFileId blob_file_id, const std::lock_guard<std::mutex> &);

void eraseStat(BlobFileId blob_file_id);
void eraseStat(BlobFileId blob_file_id, const std::lock_guard<std::mutex> &);

/**
* Choose a available `BlobStat` from `BlobStats`.
Expand All @@ -120,14 +119,15 @@ class BlobStore : public Allocator<false>
* The `INVALID_BLOBFILE_ID` means that you don't need create a new `BlobFile`.
*
*/
std::pair<BlobStatPtr, BlobFileId> chooseStat(size_t buf_size, UInt64 file_limit_size);
std::pair<BlobStatPtr, BlobFileId> chooseStat(size_t buf_size, UInt64 file_limit_size, const std::lock_guard<std::mutex> &);

BlobFileId chooseNewStat();

BlobStatPtr fileIdToStat(BlobFileId file_id);

std::list<BlobStatPtr> getStats() const
{
auto guard = lock();
return stats_map;
}

Expand All @@ -140,7 +140,7 @@ class BlobStore : public Allocator<false>
BlobFileId roll_id = 0;
std::list<BlobFileId> old_ids;
std::list<BlobStatPtr> stats_map;
std::mutex lock_stats;
mutable std::mutex lock_stats;
};

BlobStore(const FileProviderPtr & file_provider_, String path, BlobStore::Config config);
Expand All @@ -149,10 +149,10 @@ class BlobStore : public Allocator<false>

std::vector<BlobFileId> getGCStats();

PageIdAndVersionedEntryList gc(std::map<BlobFileId, PageIdAndVersionedEntries> & entries_need_gc,
const PageSize & total_page_size,
const WriteLimiterPtr & write_limiter = nullptr,
const ReadLimiterPtr & read_limiter = nullptr);
PageEntriesEdit gc(std::map<BlobFileId, PageIdAndVersionedEntries> & entries_need_gc,
const PageSize & total_page_size,
const WriteLimiterPtr & write_limiter = nullptr,
const ReadLimiterPtr & read_limiter = nullptr);

PageEntriesEdit write(DB::WriteBatch & wb, const WriteLimiterPtr & write_limiter = nullptr);

Expand Down Expand Up @@ -202,4 +202,4 @@ class BlobStore : public Allocator<false>
using BlobStorePtr = std::shared_ptr<BlobStore>;

} // namespace PS::V3
} // namespace DB
} // namespace DB
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/V3/LogFile/LogReader.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <Common/Checksum.h>
#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Encryption/ReadBufferFromFileProvider.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/ReadHelpers.h>
#include <Storages/Page/V3/LogFile/LogFormat.h>
Expand Down Expand Up @@ -37,6 +38,8 @@ LogReader::LogReader(
, log_number(log_num_)
, log(log_)
{
// Must be `BLOCK_SIZE`, or we can not ensure the correctness of reading.
assert(file->internalBuffer().size() == Format::BLOCK_SIZE);
}

LogReader::~LogReader() = default;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/LogFile/LogReader.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include <Encryption/ReadBufferFromFileProvider.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Storages/Page/V3/LogFile/LogFormat.h>
#include <Storages/Page/V3/WALStore.h>
#include <common/types.h>
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Poco/Logger.h>
#include <Storages/Page/V3/LogFile/LogFormat.h>
#include <Storages/Page/V3/LogFile/LogWriter.h>
#include <common/logger_useful.h>
#include <fmt/format.h>
Expand All @@ -21,6 +22,8 @@ LogWriter::LogWriter(
, recycle_log_files(recycle_log_files_)
, manual_flush(manual_flush_)
{
// Must be `BLOCK_SIZE`, or we can not ensure the correctness of writing.
assert(dest->internalBuffer().size() == Format::BLOCK_SIZE);
}

LogWriter::~LogWriter()
Expand Down
63 changes: 40 additions & 23 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ extern const int PS_ENTRY_NO_VALID_VERSION;
} // namespace ErrorCodes
namespace PS::V3
{
/********************************
* VersionedPageEntries methods *
********************************/

std::optional<PageEntryV3> VersionedPageEntries::getEntry(UInt64 seq) const
{
auto page_lock = acquireLock();
Expand All @@ -32,7 +36,6 @@ std::optional<PageEntryV3> VersionedPageEntries::getEntry(UInt64 seq) const
return std::nullopt;
}


std::pair<VersionedEntries, PageSize> VersionedPageEntries::getEntriesByBlobId(BlobFileId blob_id)
{
PageSize single_page_size = 0;
Expand Down Expand Up @@ -103,6 +106,10 @@ std::pair<PageEntriesV3, bool> VersionedPageEntries::deleteAndGC(UInt64 lowest_s
return std::make_pair(std::move(del_entries), entries.empty() || (entries.size() == 1 && entries.begin()->second.is_delete));
}

/**************************
* PageDirectory methods *
*************************/

PageDirectory::PageDirectory()
: sequence(0)
, log(getLogWithPrefix(nullptr, "PageDirectory"))
Expand All @@ -117,7 +124,10 @@ void PageDirectory::restore()
PageDirectorySnapshotPtr PageDirectory::createSnapshot() const
{
auto snap = std::make_shared<PageDirectorySnapshot>(sequence.load());
snapshots.emplace_back(std::weak_ptr<PageDirectorySnapshot>(snap));
{
std::lock_guard snapshots_lock(snapshots_mutex);
snapshots.emplace_back(std::weak_ptr<PageDirectorySnapshot>(snap));
}
return snap;
}

Expand Down Expand Up @@ -179,10 +189,8 @@ void PageDirectory::apply(PageEntriesEdit && edit)
std::unique_lock write_lock(table_rw_mutex); // TODO: It is totally serialized, make it a pipeline
UInt64 last_sequence = sequence.load();

auto snap = createSnapshot();

// stage 1, persisted the changes to WAL
// wal.apply(edit);
// stage 1, persisted the changes to WAL with version [seq=last_seq + 1, epoch=0]
// wal->apply(edit, PageVersionType(last_sequence + 1, 0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we do have wal->apply already , can we just use it? or is there remain some work for it?

Copy link
Contributor Author

@JaySon-Huang JaySon-Huang Jan 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// stage 2, create entry version list for pageId. nothing need to be rollback
std::unordered_map<PageId, std::pair<PageLock, int>> updating_locks;
Expand Down Expand Up @@ -269,23 +277,28 @@ void PageDirectory::apply(PageEntriesEdit && edit)
sequence.fetch_add(1);
}


void PageDirectory::gcApply(const PageIdAndVersionedEntryList & migrated_entries)
void PageDirectory::gcApply(PageEntriesEdit && migrated_edit)
{
for (const auto & [page_id, version, entry] : migrated_entries)
std::shared_lock read_lock(table_rw_mutex);
for (auto & record : migrated_edit.getMutRecords())
{
auto iter = mvcc_table_directory.find(page_id);
if (iter == mvcc_table_directory.end())
auto iter = mvcc_table_directory.find(record.page_id);
if (unlikely(iter == mvcc_table_directory.end()))
{
throw Exception(fmt::format("Can't found [pageid={}] while doing gcApply", page_id), ErrorCodes::LOGICAL_ERROR);
throw Exception(fmt::format("Can't found [pageid={}] while doing gcApply", record.page_id), ErrorCodes::LOGICAL_ERROR);
}

auto versioned_page = iter->second;
iter->second->acquireLock();
iter->second->createNewVersion(version.sequence, version.epoch + 1, entry);
// Increase the epoch for migrated record.
record.version.epoch += 1;

// TBD: wal apply
// Append the gc version to version list
auto & versioned_entries = iter->second;
auto page_lock = versioned_entries->acquireLock();
versioned_entries->createNewVersion(record.version.sequence, record.version.epoch, record.entry);
}

// Apply migrate edit into WAL with the increased epoch version
// wal->apply(migrated_edit);
}

std::pair<std::map<BlobFileId, PageIdAndVersionedEntries>, PageSize>
Expand Down Expand Up @@ -330,17 +343,21 @@ std::vector<PageEntriesV3> PageDirectory::gc()
UInt64 lowest_seq = sequence.load();
std::vector<PageEntriesV3> all_del_entries;

// Cleanup released snapshots
for (auto iter = snapshots.begin(); iter != snapshots.end(); /* empty */)
{
if (iter->expired())
iter = snapshots.erase(iter);
else
// Cleanup released snapshots
std::lock_guard lock(snapshots_mutex);
for (auto iter = snapshots.begin(); iter != snapshots.end(); /* empty */)
{
lowest_seq = std::min(lowest_seq, iter->lock()->sequence);
++iter;
if (iter->expired())
iter = snapshots.erase(iter);
else
{
lowest_seq = std::min(lowest_seq, iter->lock()->sequence);
++iter;
}
}
}

{
std::unique_lock write_lock(table_rw_mutex);
for (auto iter = mvcc_table_directory.begin(); iter != mvcc_table_directory.end(); /*empty*/)
Expand Down
Loading