Skip to content

Commit

Permalink
Refine the GC interface by PageEntriesEdit (#3879)
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang authored Jan 20, 2022
1 parent 3e10ec3 commit a497442
Show file tree
Hide file tree
Showing 14 changed files with 255 additions and 116 deletions.
45 changes: 19 additions & 26 deletions dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <Storages/Page/V3/BlobStore.h>
#include <Storages/Page/V3/PageEntriesEdit.h>

#include <ext/scope_guard.h>
#include <mutex>

namespace ProfileEvents
{
Expand Down Expand Up @@ -90,7 +92,6 @@ PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & wr
switch (write.type)
{
case WriteBatch::WriteType::PUT:
case WriteBatch::WriteType::UPSERT:
{
ChecksumClass digest;
PageEntryV3 entry;
Expand Down Expand Up @@ -124,16 +125,7 @@ PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & wr
}

buffer_pos += write.size;

if (write.type == WriteBatch::WriteType::PUT)
{
edit.put(write.page_id, entry);
}
else // WriteBatch::WriteType::UPSERT
{
edit.upsertPage(write.page_id, entry);
}

edit.put(write.page_id, entry);
break;
}
case WriteBatch::WriteType::DEL:
Expand All @@ -146,6 +138,8 @@ PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & wr
edit.ref(write.page_id, write.ori_page_id);
break;
}
default:
throw Exception(fmt::format("Unknown write type: {}", write.type));
}
}

Expand Down Expand Up @@ -192,12 +186,12 @@ std::pair<BlobFileId, BlobFileOffset> BlobStore::getPosFromStats(size_t size)
{
auto lock_stats = blob_stats.lock();
BlobFileId blob_file_id = INVALID_BLOBFILE_ID;
std::tie(stat, blob_file_id) = blob_stats.chooseStat(size, config.file_limit_size);
std::tie(stat, blob_file_id) = blob_stats.chooseStat(size, config.file_limit_size, lock_stats);

// No valid stat for puting data with `size`, create a new one
if (stat == nullptr)
{
stat = blob_stats.createStat(blob_file_id);
stat = blob_stats.createStat(blob_file_id, lock_stats);
}
}

Expand Down Expand Up @@ -362,13 +356,11 @@ std::vector<BlobFileId> BlobStore::getGCStats()
return blob_need_gc;
}

PageIdAndVersionedEntryList BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> & entries_need_gc,
const PageSize & total_page_size,
const WriteLimiterPtr & write_limiter,
const ReadLimiterPtr & read_limiter)
PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> & entries_need_gc,
const PageSize & total_page_size,
const WriteLimiterPtr & write_limiter,
const ReadLimiterPtr & read_limiter)
{
PageIdAndVersionedEntryList copy_list;

PageEntriesEdit edit;

if (total_page_size == 0)
Expand Down Expand Up @@ -407,12 +399,12 @@ PageIdAndVersionedEntryList BlobStore::gc(std::map<BlobFileId, PageIdAndVersione
new_entry.size = entry.size;

new_entry.file_id = blobfile_id;
new_entry.offset = offset_in_data;
new_entry.offset = offset_in_data; // FIXME: offset_in_file + offset_in_data?

offset_in_data += new_entry.size;
data_pos += new_entry.size;

copy_list.emplace_back(std::make_tuple(page_id, std::move(versioned), std::move(new_entry)));
edit.upsertPage(page_id, versioned, new_entry);
}
}
}
Expand All @@ -438,7 +430,7 @@ PageIdAndVersionedEntryList BlobStore::gc(std::map<BlobFileId, PageIdAndVersione
throw e;
}

return copy_list;
return edit;
}

String BlobStore::getBlobFilePath(BlobFileId blob_id) const
Expand All @@ -460,13 +452,13 @@ BlobStore::BlobStats::BlobStats(Poco::Logger * log_, BlobStore::Config config_)
{
}

std::lock_guard<std::mutex> BlobStore::BlobStats::lock()
std::lock_guard<std::mutex> BlobStore::BlobStats::lock() const
{
return std::lock_guard(lock_stats);
}


BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id)
BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id, const std::lock_guard<std::mutex> &)
{
BlobStatPtr stat = nullptr;

Expand Down Expand Up @@ -508,7 +500,7 @@ BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id)
return stat;
}

void BlobStore::BlobStats::eraseStat(BlobFileId blob_file_id)
void BlobStore::BlobStats::eraseStat(BlobFileId blob_file_id, const std::lock_guard<std::mutex> &)
{
BlobStatPtr stat = nullptr;
bool found = false;
Expand Down Expand Up @@ -553,7 +545,7 @@ BlobFileId BlobStore::BlobStats::chooseNewStat()
return rv;
}

std::pair<BlobStatPtr, BlobFileId> BlobStore::BlobStats::chooseStat(size_t buf_size, UInt64 file_limit_size)
std::pair<BlobStatPtr, BlobFileId> BlobStore::BlobStats::chooseStat(size_t buf_size, UInt64 file_limit_size, const std::lock_guard<std::mutex> &)
{
BlobStatPtr stat_ptr = nullptr;
double smallest_valid_rate = 2;
Expand Down Expand Up @@ -638,6 +630,7 @@ void BlobStore::BlobStats::BlobStat::removePosFromStat(BlobFileOffset offset, si

BlobStatPtr BlobStore::BlobStats::fileIdToStat(BlobFileId file_id)
{
auto guard = lock();
for (auto & stat : stats_map)
{
if (stat->id == file_id)
Expand Down
22 changes: 11 additions & 11 deletions dbms/src/Storages/Page/V3/BlobStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ extern const int LOGICAL_ERROR;

namespace PS::V3
{
using PageIdAndVersionedEntryList = std::vector<std::tuple<PageId, PageVersionType, PageEntryV3>>;
using PageIdAndVersionedEntries = std::vector<std::pair<PageId, VersionedEntries>>;

class BlobStore : public Allocator<false>
Expand Down Expand Up @@ -100,11 +99,11 @@ class BlobStore : public Allocator<false>
public:
BlobStats(Poco::Logger * log_, BlobStore::Config config);

std::lock_guard<std::mutex> lock();
std::lock_guard<std::mutex> lock() const;

BlobStatPtr createStat(BlobFileId blob_file_id);
BlobStatPtr createStat(BlobFileId blob_file_id, const std::lock_guard<std::mutex> &);

void eraseStat(BlobFileId blob_file_id);
void eraseStat(BlobFileId blob_file_id, const std::lock_guard<std::mutex> &);

/**
* Choose a available `BlobStat` from `BlobStats`.
Expand All @@ -120,14 +119,15 @@ class BlobStore : public Allocator<false>
* The `INVALID_BLOBFILE_ID` means that you don't need create a new `BlobFile`.
*
*/
std::pair<BlobStatPtr, BlobFileId> chooseStat(size_t buf_size, UInt64 file_limit_size);
std::pair<BlobStatPtr, BlobFileId> chooseStat(size_t buf_size, UInt64 file_limit_size, const std::lock_guard<std::mutex> &);

BlobFileId chooseNewStat();

BlobStatPtr fileIdToStat(BlobFileId file_id);

std::list<BlobStatPtr> getStats() const
{
auto guard = lock();
return stats_map;
}

Expand All @@ -140,7 +140,7 @@ class BlobStore : public Allocator<false>
BlobFileId roll_id = 0;
std::list<BlobFileId> old_ids;
std::list<BlobStatPtr> stats_map;
std::mutex lock_stats;
mutable std::mutex lock_stats;
};

BlobStore(const FileProviderPtr & file_provider_, String path, BlobStore::Config config);
Expand All @@ -149,10 +149,10 @@ class BlobStore : public Allocator<false>

std::vector<BlobFileId> getGCStats();

PageIdAndVersionedEntryList gc(std::map<BlobFileId, PageIdAndVersionedEntries> & entries_need_gc,
const PageSize & total_page_size,
const WriteLimiterPtr & write_limiter = nullptr,
const ReadLimiterPtr & read_limiter = nullptr);
PageEntriesEdit gc(std::map<BlobFileId, PageIdAndVersionedEntries> & entries_need_gc,
const PageSize & total_page_size,
const WriteLimiterPtr & write_limiter = nullptr,
const ReadLimiterPtr & read_limiter = nullptr);

PageEntriesEdit write(DB::WriteBatch & wb, const WriteLimiterPtr & write_limiter = nullptr);

Expand Down Expand Up @@ -202,4 +202,4 @@ class BlobStore : public Allocator<false>
using BlobStorePtr = std::shared_ptr<BlobStore>;

} // namespace PS::V3
} // namespace DB
} // namespace DB
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/V3/LogFile/LogReader.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <Common/Checksum.h>
#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Encryption/ReadBufferFromFileProvider.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/ReadHelpers.h>
#include <Storages/Page/V3/LogFile/LogFormat.h>
Expand Down Expand Up @@ -37,6 +38,8 @@ LogReader::LogReader(
, log_number(log_num_)
, log(log_)
{
// Must be `BLOCK_SIZE`, or we can not ensure the correctness of reading.
assert(file->internalBuffer().size() == Format::BLOCK_SIZE);
}

LogReader::~LogReader() = default;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/LogFile/LogReader.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include <Encryption/ReadBufferFromFileProvider.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Storages/Page/V3/LogFile/LogFormat.h>
#include <Storages/Page/V3/WALStore.h>
#include <common/types.h>
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Poco/Logger.h>
#include <Storages/Page/V3/LogFile/LogFormat.h>
#include <Storages/Page/V3/LogFile/LogWriter.h>
#include <common/logger_useful.h>
#include <fmt/format.h>
Expand All @@ -21,6 +22,8 @@ LogWriter::LogWriter(
, recycle_log_files(recycle_log_files_)
, manual_flush(manual_flush_)
{
// Must be `BLOCK_SIZE`, or we can not ensure the correctness of writing.
assert(dest->internalBuffer().size() == Format::BLOCK_SIZE);
}

LogWriter::~LogWriter()
Expand Down
63 changes: 40 additions & 23 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ extern const int PS_ENTRY_NO_VALID_VERSION;
} // namespace ErrorCodes
namespace PS::V3
{
/********************************
* VersionedPageEntries methods *
********************************/

std::optional<PageEntryV3> VersionedPageEntries::getEntry(UInt64 seq) const
{
auto page_lock = acquireLock();
Expand All @@ -32,7 +36,6 @@ std::optional<PageEntryV3> VersionedPageEntries::getEntry(UInt64 seq) const
return std::nullopt;
}


std::pair<VersionedEntries, PageSize> VersionedPageEntries::getEntriesByBlobId(BlobFileId blob_id)
{
PageSize single_page_size = 0;
Expand Down Expand Up @@ -103,6 +106,10 @@ std::pair<PageEntriesV3, bool> VersionedPageEntries::deleteAndGC(UInt64 lowest_s
return std::make_pair(std::move(del_entries), entries.empty() || (entries.size() == 1 && entries.begin()->second.is_delete));
}

/**************************
* PageDirectory methods *
*************************/

PageDirectory::PageDirectory()
: sequence(0)
, log(getLogWithPrefix(nullptr, "PageDirectory"))
Expand All @@ -117,7 +124,10 @@ void PageDirectory::restore()
PageDirectorySnapshotPtr PageDirectory::createSnapshot() const
{
auto snap = std::make_shared<PageDirectorySnapshot>(sequence.load());
snapshots.emplace_back(std::weak_ptr<PageDirectorySnapshot>(snap));
{
std::lock_guard snapshots_lock(snapshots_mutex);
snapshots.emplace_back(std::weak_ptr<PageDirectorySnapshot>(snap));
}
return snap;
}

Expand Down Expand Up @@ -179,10 +189,8 @@ void PageDirectory::apply(PageEntriesEdit && edit)
std::unique_lock write_lock(table_rw_mutex); // TODO: It is totally serialized, make it a pipeline
UInt64 last_sequence = sequence.load();

auto snap = createSnapshot();

// stage 1, persisted the changes to WAL
// wal.apply(edit);
// stage 1, persisted the changes to WAL with version [seq=last_seq + 1, epoch=0]
// wal->apply(edit, PageVersionType(last_sequence + 1, 0));

// stage 2, create entry version list for pageId. nothing need to be rollback
std::unordered_map<PageId, std::pair<PageLock, int>> updating_locks;
Expand Down Expand Up @@ -269,23 +277,28 @@ void PageDirectory::apply(PageEntriesEdit && edit)
sequence.fetch_add(1);
}


void PageDirectory::gcApply(const PageIdAndVersionedEntryList & migrated_entries)
void PageDirectory::gcApply(PageEntriesEdit && migrated_edit)
{
for (const auto & [page_id, version, entry] : migrated_entries)
std::shared_lock read_lock(table_rw_mutex);
for (auto & record : migrated_edit.getMutRecords())
{
auto iter = mvcc_table_directory.find(page_id);
if (iter == mvcc_table_directory.end())
auto iter = mvcc_table_directory.find(record.page_id);
if (unlikely(iter == mvcc_table_directory.end()))
{
throw Exception(fmt::format("Can't found [pageid={}] while doing gcApply", page_id), ErrorCodes::LOGICAL_ERROR);
throw Exception(fmt::format("Can't found [pageid={}] while doing gcApply", record.page_id), ErrorCodes::LOGICAL_ERROR);
}

auto versioned_page = iter->second;
iter->second->acquireLock();
iter->second->createNewVersion(version.sequence, version.epoch + 1, entry);
// Increase the epoch for migrated record.
record.version.epoch += 1;

// TBD: wal apply
// Append the gc version to version list
auto & versioned_entries = iter->second;
auto page_lock = versioned_entries->acquireLock();
versioned_entries->createNewVersion(record.version.sequence, record.version.epoch, record.entry);
}

// Apply migrate edit into WAL with the increased epoch version
// wal->apply(migrated_edit);
}

std::pair<std::map<BlobFileId, PageIdAndVersionedEntries>, PageSize>
Expand Down Expand Up @@ -330,17 +343,21 @@ std::vector<PageEntriesV3> PageDirectory::gc()
UInt64 lowest_seq = sequence.load();
std::vector<PageEntriesV3> all_del_entries;

// Cleanup released snapshots
for (auto iter = snapshots.begin(); iter != snapshots.end(); /* empty */)
{
if (iter->expired())
iter = snapshots.erase(iter);
else
// Cleanup released snapshots
std::lock_guard lock(snapshots_mutex);
for (auto iter = snapshots.begin(); iter != snapshots.end(); /* empty */)
{
lowest_seq = std::min(lowest_seq, iter->lock()->sequence);
++iter;
if (iter->expired())
iter = snapshots.erase(iter);
else
{
lowest_seq = std::min(lowest_seq, iter->lock()->sequence);
++iter;
}
}
}

{
std::unique_lock write_lock(table_rw_mutex);
for (auto iter = mvcc_table_directory.begin(); iter != mvcc_table_directory.end(); /*empty*/)
Expand Down
Loading

0 comments on commit a497442

Please sign in to comment.