Skip to content

Commit

Permalink
Refactor WAL log writer && IO limiter (#4107)
Browse files Browse the repository at this point in the history
ref #3594
  • Loading branch information
jiaqizho authored Mar 2, 2022
1 parent 4954a24 commit a89b3bb
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 251 deletions.
73 changes: 50 additions & 23 deletions dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Poco/Logger.h>
#include <Storages/Page/PageUtil.h>
#include <Storages/Page/V3/LogFile/LogFormat.h>
#include <Storages/Page/V3/LogFile/LogWriter.h>
#include <common/logger_useful.h>
Expand All @@ -12,50 +13,63 @@
namespace DB::PS::V3
{
LogWriter::LogWriter(
std::unique_ptr<WriteBufferFromFileBase> && dest_,
String path_,
const FileProviderPtr & file_provider_,
Format::LogNumberType log_number_,
bool recycle_log_files_,
bool manual_flush_)
: dest(std::move(dest_))
: path(path_)
, file_provider(file_provider_)
, block_offset(0)
, log_number(log_number_)
, recycle_log_files(recycle_log_files_)
, manual_flush(manual_flush_)
, write_buffer(nullptr, 0)
{
// Must be `BLOCK_SIZE`, or we can not ensure the correctness of writing.
assert(dest->internalBuffer().size() == Format::BLOCK_SIZE);
log_file = file_provider->newWritableFile(
path,
EncryptionPath(path, ""),
false,
/*create_new_encryption_info_*/ false);

buffer = static_cast<char *>(alloc(buffer_size));
write_buffer = WriteBuffer(buffer, buffer_size);
}

void LogWriter::resetBuffer()
{
write_buffer = WriteBuffer(buffer, buffer_size);
}

LogWriter::~LogWriter()
{
if (dest)
{
flush();
log_file->fsync();
log_file->close();

dest->close(); // close explicitly
}
free(buffer, buffer_size);
}

size_t LogWriter::writtenBytes() const
{
return dest->getMaterializedBytes();
return written_bytes;
}

void LogWriter::flush()
void LogWriter::flush(const WriteLimiterPtr & write_limiter)
{
dest->sync();
PageUtil::writeFile(log_file, written_bytes, write_buffer.buffer().begin(), write_buffer.offset(), write_limiter, false);
log_file->fsync();
written_bytes += write_buffer.offset();

// reset the write_buffer
resetBuffer();
}

void LogWriter::close()
{
if (dest)
{
dest->close();
dest.reset();
}
log_file->close();
}

void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size)
void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size, const WriteLimiterPtr & write_limiter)
{
// Header size varies depending on whether we are recycling or not.
const int header_size = recycle_log_files ? Format::RECYCLABLE_HEADER_SIZE : Format::HEADER_SIZE;
Expand All @@ -64,6 +78,17 @@ void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size)
// we still want to iterate once to emit a single zero-length record.
bool begin = true;
size_t payload_left = payload_size;

size_t head_sizes = ((payload_size / Format::BLOCK_SIZE) + 1) * Format::RECYCLABLE_HEADER_SIZE;
if (payload_size + head_sizes >= buffer_size)
{
size_t new_buff_size = payload_size + ((head_sizes / Format::BLOCK_SIZE) + 1) * Format::BLOCK_SIZE;

buffer = static_cast<char *>(realloc(buffer, buffer_size, new_buff_size));
buffer_size = new_buff_size;
resetBuffer();
}

do
{
const Int64 leftover = Format::BLOCK_SIZE - block_offset;
Expand All @@ -75,7 +100,7 @@ void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size)
{
// Fill the trailer with all zero
static constexpr char MAX_ZERO_HEADER[Format::RECYCLABLE_HEADER_SIZE]{'\x00'};
writeString(MAX_ZERO_HEADER, leftover, *dest);
writeString(MAX_ZERO_HEADER, leftover, write_buffer);
}
block_offset = 0;
}
Expand All @@ -101,7 +126,9 @@ void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size)
} while (payload.hasPendingData());

if (!manual_flush)
dest->sync();
{
flush(write_limiter);
}
}

void LogWriter::emitPhysicalRecord(Format::RecordType type, ReadBuffer & payload, size_t length)
Expand Down Expand Up @@ -150,9 +177,9 @@ void LogWriter::emitPhysicalRecord(Format::RecordType type, ReadBuffer & payload
// Write the checksum, header and the payload
digest.update(payload.position(), length);
Format::ChecksumType checksum = digest.checksum();
writeIntBinary(checksum, *dest);
writeString(header_buff.buffer().begin(), header_buff.count(), *dest);
writeString(payload.position(), length, *dest);
writeIntBinary(checksum, write_buffer);
writeString(header_buff.buffer().begin(), header_buff.count(), write_buffer);
writeString(payload.position(), length, write_buffer);

block_offset += header_size + length;
}
Expand Down
24 changes: 19 additions & 5 deletions dbms/src/Storages/Page/V3/LogFile/LogWriter.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <Encryption/FileProvider.h>
#include <Storages/Page/V3/LogFile/LogFormat.h>
#include <common/types.h>

Expand Down Expand Up @@ -55,11 +56,12 @@ namespace PS::V3
* Log number = 32bit log file number, so that we can distinguish between
* records written by the most recent log writer vs a previous one.
*/
class LogWriter final
class LogWriter final : private Allocator<false>
{
public:
LogWriter(
std::unique_ptr<WriteBufferFromFileBase> && dest_,
String path_,
const FileProviderPtr & file_provider_,
Format::LogNumberType log_number_,
bool recycle_log_files_,
bool manual_flush_ = false);
Expand All @@ -69,9 +71,9 @@ class LogWriter final

~LogWriter();

void addRecord(ReadBuffer & payload, size_t payload_size);
void addRecord(ReadBuffer & payload, size_t payload_size, const WriteLimiterPtr & write_limiter = nullptr);

void flush();
void flush(const WriteLimiterPtr & write_limiter = nullptr);

void close();

Expand All @@ -85,14 +87,26 @@ class LogWriter final
private:
void emitPhysicalRecord(Format::RecordType type, ReadBuffer & payload, size_t length);

void resetBuffer();

private:
std::unique_ptr<WriteBufferFromFileBase> dest;
String path;
FileProviderPtr file_provider;

WritableFilePtr log_file;

size_t block_offset; // Current offset in block
Format::LogNumberType log_number;
const bool recycle_log_files;
// If true, it does not flush after each write. Instead it relies on the upper
// layer to manually does the flush by calling ::flush()
const bool manual_flush;

size_t written_bytes = 0;

char * buffer;
size_t buffer_size = Format::BLOCK_SIZE;
WriteBuffer write_buffer;
};
} // namespace PS::V3
} // namespace DB
12 changes: 6 additions & 6 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,13 +394,13 @@ std::set<PageId> PageDirectory::getAllPageIds()
return page_ids;
}

void PageDirectory::apply(PageEntriesEdit && edit)
void PageDirectory::apply(PageEntriesEdit && edit, const WriteLimiterPtr & write_limiter)
{
std::unique_lock write_lock(table_rw_mutex); // TODO: It is totally serialized, make it a pipeline
UInt64 last_sequence = sequence.load();

// stage 1, persisted the changes to WAL with version [seq=last_seq + 1, epoch=0]
wal->apply(edit, PageVersionType(last_sequence + 1, 0));
wal->apply(edit, PageVersionType(last_sequence + 1, 0), write_limiter);

// stage 2, create entry version list for pageId. nothing need to be rollback
std::unordered_map<PageId, std::pair<PageLock, int>> updating_locks;
Expand Down Expand Up @@ -496,7 +496,7 @@ void PageDirectory::apply(PageEntriesEdit && edit)
sequence.fetch_add(1);
}

std::set<PageId> PageDirectory::gcApply(PageEntriesEdit && migrated_edit, bool need_scan_page_ids)
std::set<PageId> PageDirectory::gcApply(PageEntriesEdit && migrated_edit, bool need_scan_page_ids, const WriteLimiterPtr & write_limiter)
{
{
std::shared_lock read_lock(table_rw_mutex);
Expand All @@ -519,7 +519,7 @@ std::set<PageId> PageDirectory::gcApply(PageEntriesEdit && migrated_edit, bool n
} // Then we should release the read lock on `table_rw_mutex`

// Apply migrate edit into WAL with the increased epoch version
wal->apply(migrated_edit);
wal->apply(migrated_edit, write_limiter);

if (!need_scan_page_ids)
{
Expand Down Expand Up @@ -567,12 +567,12 @@ PageDirectory::getEntriesByBlobIds(const std::vector<BlobFileId> & blob_need_gc)
}


std::vector<PageEntriesV3> PageDirectory::gc()
std::vector<PageEntriesV3> PageDirectory::gc(const WriteLimiterPtr & write_limiter, const ReadLimiterPtr & read_limiter)
{
[[maybe_unused]] bool done_anything = false;
UInt64 lowest_seq = sequence.load();

done_anything |= wal->compactLogs();
done_anything |= wal->compactLogs(write_limiter, read_limiter);

{
// Cleanup released snapshots
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,14 @@ class PageDirectory

std::set<PageId> getAllPageIds();

void apply(PageEntriesEdit && edit);
void apply(PageEntriesEdit && edit, const WriteLimiterPtr & write_limiter = nullptr);

std::pair<std::map<BlobFileId, PageIdAndVersionedEntries>, PageSize>
getEntriesByBlobIds(const std::vector<BlobFileId> & blob_need_gc);

std::set<PageId> gcApply(PageEntriesEdit && migrated_edit, bool need_scan_page_ids);
std::set<PageId> gcApply(PageEntriesEdit && migrated_edit, bool need_scan_page_ids, const WriteLimiterPtr & write_limiter = nullptr);

std::vector<PageEntriesV3> gc();
std::vector<PageEntriesV3> gc(const WriteLimiterPtr & write_limiter = nullptr, const ReadLimiterPtr & read_limiter = nullptr);

size_t numPages() const
{
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Storages/Page/V3/PageStorageImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void PageStorageImpl::restore()
collapsing_directory.apply(std::move(edit));
};
// Restore `collapsing_directory` from disk
auto wal = WALStore::create(callback, file_provider, delegator, /*write_limiter*/ nullptr);
auto wal = WALStore::create(callback, file_provider, delegator);
// PageId max_page_id = collapsing_directory.max_applied_page_id; // TODO: return it to outer function

// TODO: Now `PageDirectory::create` and `BlobStore::restore` iterate all entries in `collapsing_directory`,
Expand Down Expand Up @@ -78,7 +78,7 @@ void PageStorageImpl::write(DB::WriteBatch && write_batch, const WriteLimiterPtr

// Persist Page data to BlobStore
auto edit = blob_store.write(write_batch, write_limiter);
page_directory.apply(std::move(edit));
page_directory.apply(std::move(edit), write_limiter);
}

DB::PageEntry PageStorageImpl::getEntry(PageId page_id, SnapshotPtr snapshot)
Expand Down Expand Up @@ -178,7 +178,7 @@ void PageStorageImpl::traverse(const std::function<void(const DB::Page & page)>
}
}

bool PageStorageImpl::gc(bool /*not_skip*/, const WriteLimiterPtr & /*write_limiter*/, const ReadLimiterPtr & /*read_limiter*/)
bool PageStorageImpl::gc(bool /*not_skip*/, const WriteLimiterPtr & write_limiter, const ReadLimiterPtr & read_limiter)
{
// If another thread is running gc, just return;
bool v = false;
Expand All @@ -192,7 +192,7 @@ bool PageStorageImpl::gc(bool /*not_skip*/, const WriteLimiterPtr & /*write_limi

// 1. Do the MVCC gc, clean up expired snapshot.
// And get the expired entries.
const auto & del_entries = page_directory.gc();
const auto & del_entries = page_directory.gc(write_limiter, read_limiter);

// 2. Remove the expired entries in BlobStore.
// It won't delete the data on the disk.
Expand Down Expand Up @@ -224,7 +224,7 @@ bool PageStorageImpl::gc(bool /*not_skip*/, const WriteLimiterPtr & /*write_limi
// 5. Do the BlobStore GC
// After BlobStore GC, these entries will be migrated to a new blob.
// Then we should notify MVCC apply the change.
PageEntriesEdit gc_edit = blob_store.gc(blob_gc_info, total_page_size);
PageEntriesEdit gc_edit = blob_store.gc(blob_gc_info, total_page_size, write_limiter, read_limiter);
if (gc_edit.empty())
{
throw Exception("Something wrong after BlobStore GC.", ErrorCodes::LOGICAL_ERROR);
Expand All @@ -237,7 +237,7 @@ bool PageStorageImpl::gc(bool /*not_skip*/, const WriteLimiterPtr & /*write_limi
// be reset to correct state during restore. If any exception thrown, then some BlobFiles
// will be remained as "read-only" files while entries in them are useless in actual.
// Those BlobFiles should be cleaned during next restore.
const auto & page_ids = page_directory.gcApply(std::move(gc_edit), external_pages_remover != nullptr);
const auto & page_ids = page_directory.gcApply(std::move(gc_edit), external_pages_remover != nullptr, write_limiter);

(void)page_ids;

Expand Down
13 changes: 7 additions & 6 deletions dbms/src/Storages/Page/V3/WAL/WALReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,24 +60,25 @@ LogFilenameSet WALStoreReader::listAllFiles(
return log_files;
}

WALStoreReaderPtr WALStoreReader::create(FileProviderPtr & provider, LogFilenameSet files)
WALStoreReaderPtr WALStoreReader::create(FileProviderPtr & provider, LogFilenameSet files, const ReadLimiterPtr & read_limiter)
{
auto reader = std::make_shared<WALStoreReader>(provider, std::move(files));
auto reader = std::make_shared<WALStoreReader>(provider, std::move(files), read_limiter);
reader->openNextFile();
return reader;
}

WALStoreReaderPtr WALStoreReader::create(FileProviderPtr & provider, PSDiskDelegatorPtr & delegator)
WALStoreReaderPtr WALStoreReader::create(FileProviderPtr & provider, PSDiskDelegatorPtr & delegator, const ReadLimiterPtr & read_limiter)
{
Poco::Logger * logger = &Poco::Logger::get("WALStore");
LogFilenameSet log_files = listAllFiles(delegator, logger);
return create(provider, std::move(log_files));
return create(provider, std::move(log_files), read_limiter);
}

WALStoreReader::WALStoreReader(FileProviderPtr & provider_, LogFilenameSet && files_)
WALStoreReader::WALStoreReader(FileProviderPtr & provider_, LogFilenameSet && files_, const ReadLimiterPtr & read_limiter_)
: provider(provider_)
, files(std::move(files_))
, next_reading_file(files.begin())
, read_limiter(read_limiter_)
, logger(&Poco::Logger::get("LogReader"))
{}

Expand Down Expand Up @@ -135,7 +136,7 @@ bool WALStoreReader::openNextFile()
EncryptionPath{parent_path, filename},
/*estimated_size*/ Format::BLOCK_SIZE,
/*aio_threshold*/ 0,
/*read_limiter*/ nullptr,
/*read_limiter*/ read_limiter,
/*buffer_size*/ Format::BLOCK_SIZE // Must be `Format::BLOCK_SIZE`
);
reader = std::make_unique<LogReader>(
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Storages/Page/V3/WAL/WALReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ class WALStoreReader
public:
static LogFilenameSet listAllFiles(PSDiskDelegatorPtr & delegator, Poco::Logger * logger);

static WALStoreReaderPtr create(FileProviderPtr & provider, LogFilenameSet files);
static WALStoreReaderPtr create(FileProviderPtr & provider, LogFilenameSet files, const ReadLimiterPtr & read_limiter = nullptr);

static WALStoreReaderPtr create(FileProviderPtr & provider, PSDiskDelegatorPtr & delegator);
static WALStoreReaderPtr create(FileProviderPtr & provider, PSDiskDelegatorPtr & delegator, const ReadLimiterPtr & read_limiter = nullptr);

bool remained() const;

Expand All @@ -56,7 +56,7 @@ class WALStoreReader
return reader->getLogNumber();
}

WALStoreReader(FileProviderPtr & provider_, LogFilenameSet && files_);
WALStoreReader(FileProviderPtr & provider_, LogFilenameSet && files_, const ReadLimiterPtr & read_limiter_ = nullptr);

WALStoreReader(const WALStoreReader &) = delete;
WALStoreReader & operator=(const WALStoreReader &) = delete;
Expand All @@ -69,6 +69,7 @@ class WALStoreReader

const LogFilenameSet files;
LogFilenameSet::const_iterator next_reading_file;
const ReadLimiterPtr read_limiter;
std::unique_ptr<LogReader> reader;
Poco::Logger * logger;
};
Expand Down
Loading

0 comments on commit a89b3bb

Please sign in to comment.