Skip to content

Commit

Permalink
Support big page PUT into blobstore. (#4882)
Browse files Browse the repository at this point in the history
ref #3594
  • Loading branch information
jiaqizho authored May 13, 2022
1 parent 9bfb02f commit 65bdafa
Show file tree
Hide file tree
Showing 7 changed files with 616 additions and 71 deletions.
217 changes: 165 additions & 52 deletions dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,16 @@ void BlobStore::registerPaths()
if (blob_id != INVALID_BLOBFILE_ID)
{
Poco::File blob(fmt::format("{}/{}", path, blob_name));
delegator->addPageFileUsedSize({blob_id, 0}, blob.getSize(), path, true);
blob_stats.createStatNotChecking(blob_id, lock_stats);
auto blob_size = blob.getSize();
delegator->addPageFileUsedSize({blob_id, 0}, blob_size, path, true);
if (blob_size > config.file_limit_size)
{
blob_stats.createBigPageStatNotChecking(blob_id, lock_stats);
}
else
{
blob_stats.createStatNotChecking(blob_id, lock_stats);
}
}
else
{
Expand All @@ -101,20 +109,100 @@ void BlobStore::registerPaths()
}
}

PageEntriesEdit BlobStore::handleLargeWrite(DB::WriteBatch & wb, const WriteLimiterPtr & write_limiter)
{
auto ns_id = wb.getNamespaceId();
PageEntriesEdit edit;
for (auto & write : wb.getWrites())
{
switch (write.type)
{
case WriteBatch::WriteType::PUT:
{
ChecksumClass digest;
PageEntryV3 entry;

auto [blob_id, offset_in_file] = getPosFromStats(write.size);

entry.file_id = blob_id;
entry.size = write.size;
entry.tag = write.tag;
entry.offset = offset_in_file;
// padding size won't work on big write batch
entry.padded_size = 0;

BufferBase::Buffer data_buf = write.read_buffer->buffer();

digest.update(data_buf.begin(), write.size);
entry.checksum = digest.checksum();

UInt64 field_begin, field_end;

for (size_t i = 0; i < write.offsets.size(); ++i)
{
ChecksumClass field_digest;
field_begin = write.offsets[i].first;
field_end = (i == write.offsets.size() - 1) ? write.size : write.offsets[i + 1].first;

field_digest.update(data_buf.begin() + field_begin, field_end - field_begin);
write.offsets[i].second = field_digest.checksum();
}

if (!write.offsets.empty())
{
// we can swap from WriteBatch instead of copying
entry.field_offsets.swap(write.offsets);
}

try
{
auto blob_file = getBlobFile(blob_id);
blob_file->write(data_buf.begin(), offset_in_file, write.size, write_limiter);
}
catch (DB::Exception & e)
{
removePosFromStats(blob_id, offset_in_file, write.size);
LOG_FMT_ERROR(log, "[blob_id={}] [offset_in_file={}] [size={}] write failed.", blob_id, offset_in_file, write.size);
throw e;
}

edit.put(buildV3Id(ns_id, write.page_id), entry);
break;
}
case WriteBatch::WriteType::DEL:
{
edit.del(buildV3Id(ns_id, write.page_id));
break;
}
case WriteBatch::WriteType::REF:
{
edit.ref(buildV3Id(ns_id, write.page_id), buildV3Id(ns_id, write.ori_page_id));
break;
}
case WriteBatch::WriteType::PUT_EXTERNAL:
edit.putExternal(buildV3Id(ns_id, write.page_id));
break;
case WriteBatch::WriteType::UPSERT:
throw Exception(fmt::format("Unknown write type: {}", write.type));
}
}

return edit;
}

PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & write_limiter)
{
ProfileEvents::increment(ProfileEvents::PSMWritePages, wb.putWriteCount());

PageEntriesEdit edit;
const size_t all_page_data_size = wb.getTotalDataSize();

if (all_page_data_size > config.file_limit_size)
{
throw Exception(fmt::format("Write batch is too large. It should less than [file_limit_size={}]",
config.file_limit_size.get()),
ErrorCodes::LOGICAL_ERROR);
return handleLargeWrite(wb, write_limiter);
}

PageEntriesEdit edit;

auto ns_id = wb.getNamespaceId();
if (all_page_data_size == 0)
{
Expand Down Expand Up @@ -167,7 +255,6 @@ PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & wr

size_t offset_in_allocated = 0;


for (auto & write : wb.getWrites())
{
switch (write.type)
Expand Down Expand Up @@ -316,19 +403,29 @@ std::pair<BlobFileId, BlobFileOffset> BlobStore::getPosFromStats(size_t size)

auto lock_stat = [size, this, &stat]() {
auto lock_stats = blob_stats.lock();
BlobFileId blob_file_id = INVALID_BLOBFILE_ID;
std::tie(stat, blob_file_id) = blob_stats.chooseStat(size, lock_stats);
if (stat == nullptr)
if (size > config.file_limit_size)
{
// No valid stat for puting data with `size`, create a new one
stat = blob_stats.createStat(blob_file_id, lock_stats);
auto blob_file_id = blob_stats.chooseBigStat(lock_stats);
stat = blob_stats.createBigStat(blob_file_id, lock_stats);

return stat->lock();
}
else
{
BlobFileId blob_file_id = INVALID_BLOBFILE_ID;
std::tie(stat, blob_file_id) = blob_stats.chooseStat(size, lock_stats);
if (stat == nullptr)
{
// No valid stat for puting data with `size`, create a new one
stat = blob_stats.createStat(blob_file_id, lock_stats);
}

// We must get the lock from BlobStat under the BlobStats lock
// to ensure that BlobStat updates are serialized.
// Otherwise it may cause stat to fail to get the span for writing
// and throwing exception.
return stat->lock();
// We must get the lock from BlobStat under the BlobStats lock
// to ensure that BlobStat updates are serialized.
// Otherwise it may cause stat to fail to get the span for writing
// and throwing exception.
return stat->lock();
}
}();

// We need to assume that this insert will reduce max_cap.
Expand Down Expand Up @@ -701,11 +798,12 @@ struct BlobStoreGCInfo
{
String toString() const
{
return fmt::format("{}. {}. {}. {}. ",
return fmt::format("{}. {}. {}. {}. {}.",
toTypeString("Read-Only Blob", 0),
toTypeString("No GC Blob", 1),
toTypeString("Full GC Blob", 2),
toTypeString("Truncated Blob", 3));
toTypeString("Truncated Blob", 3),
toTypeString("Big Blob", 4));
}

void appendToReadOnlyBlob(const BlobFileId blob_id, double valid_rate)
Expand All @@ -728,12 +826,18 @@ struct BlobStoreGCInfo
blob_gc_info[3].emplace_back(std::make_pair(blob_id, valid_rate));
}

void appendToBigBlob(const BlobFileId blob_id, double valid_rate)
{
blob_gc_info[4].emplace_back(std::make_pair(blob_id, valid_rate));
}

private:
// 1. read only blob
// 2. no need gc blob
// 3. full gc blob
// 4. need truncate blob
std::vector<std::pair<BlobFileId, double>> blob_gc_info[4];
// 5. big blob
std::vector<std::pair<BlobFileId, double>> blob_gc_info[5];

String toTypeString(const std::string_view prefix, const size_t index) const
{
Expand Down Expand Up @@ -778,6 +882,13 @@ std::vector<BlobFileId> BlobStore::getGCStats()
continue;
}

if (stat->isBigBlob())
{
blobstore_gc_info.appendToBigBlob(stat->id, stat->sm_valid_rate);
LOG_FMT_TRACE(log, "Current [blob_id={}] is big-blob", stat->id);
continue;
}

auto lock = stat->lock();
auto right_margin = stat->smap->getRightMargin();

Expand Down Expand Up @@ -1038,35 +1149,6 @@ std::pair<BlobFileId, String> BlobStore::BlobStats::getBlobIdFromName(String blo
return {INVALID_BLOBFILE_ID, err_msg};
}

std::set<BlobFileId> BlobStore::BlobStats::getBlobIdsFromDisk(String path) const
{
std::set<BlobFileId> blob_ids_on_disk;

Poco::File store_path(path);
if (!store_path.exists())
{
return blob_ids_on_disk;
}

std::vector<String> file_list;
store_path.list(file_list);

for (const auto & blob_name : file_list)
{
const auto & [blob_id, err_msg] = getBlobIdFromName(blob_name);
if (blob_id != INVALID_BLOBFILE_ID)
{
blob_ids_on_disk.insert(blob_id);
}
else
{
LOG_FMT_INFO(log, "Ignore not blob file [dir={}] [file={}] [err_msg={}]", path, blob_name, err_msg);
}
}

return blob_ids_on_disk;
}

void BlobStore::BlobStats::restore()
{
BlobFileId max_restored_file_id = 0;
Expand Down Expand Up @@ -1129,7 +1211,7 @@ BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id, const std:

BlobStatPtr BlobStore::BlobStats::createStatNotChecking(BlobFileId blob_file_id, const std::lock_guard<std::mutex> &)
{
LOG_FMT_DEBUG(log, "Created a new BlobStat [blob_id={}]", blob_file_id);
LOG_FMT_INFO(log, "Created a new BlobStat [blob_id={}]", blob_file_id);
BlobStatPtr stat = std::make_shared<BlobStat>(
blob_file_id,
static_cast<SpaceMap::SpaceMapType>(config.spacemap_type.get()),
Expand All @@ -1140,6 +1222,32 @@ BlobStatPtr BlobStore::BlobStats::createStatNotChecking(BlobFileId blob_file_id,
return stat;
}

BlobStatPtr BlobStore::BlobStats::createBigStat(BlobFileId blob_file_id, const std::lock_guard<std::mutex> & guard)
{
auto stat = createBigPageStatNotChecking(blob_file_id, guard);
// Roll to the next new blob id
if (blob_file_id == roll_id)
{
roll_id++;
}

return stat;
}

BlobStatPtr BlobStore::BlobStats::createBigPageStatNotChecking(BlobFileId blob_file_id, const std::lock_guard<std::mutex> &)
{
LOG_FMT_INFO(log, "Created a new big BlobStat [blob_id={}]", blob_file_id);
BlobStatPtr stat = std::make_shared<BlobStat>(
blob_file_id,
SpaceMap::SpaceMapType::SMAP64_BIG,
config.file_limit_size,
BlobStatType::BIG_BLOB);

PageFileIdAndLevel id_lvl{blob_file_id, 0};
stats_map[delegator->choosePath(id_lvl)].emplace_back(stat);
return stat;
}

void BlobStore::BlobStats::eraseStat(const BlobStatPtr && stat, const std::lock_guard<std::mutex> &)
{
PageFileIdAndLevel id_lvl{stat->id, 0};
Expand Down Expand Up @@ -1199,7 +1307,7 @@ std::pair<BlobStatPtr, BlobFileId> BlobStore::BlobStats::chooseStat(size_t buf_s
for (const auto & stat : stats_iter->second)
{
auto lock = stat->lock(); // TODO: will it bring performance regression?
if (!stat->isReadOnly()
if (stat->isNormal()
&& stat->sm_max_caps >= buf_size
&& stat->sm_valid_rate < smallest_valid_rate)
{
Expand Down Expand Up @@ -1234,6 +1342,11 @@ std::pair<BlobStatPtr, BlobFileId> BlobStore::BlobStats::chooseStat(size_t buf_s
return std::make_pair(stat_ptr, INVALID_BLOBFILE_ID);
}

BlobFileId BlobStore::BlobStats::chooseBigStat(const std::lock_guard<std::mutex> &) const
{
return roll_id;
}

BlobStatPtr BlobStore::BlobStats::blobIdToStat(BlobFileId file_id, bool ignore_not_exist)
{
auto guard = lock();
Expand Down Expand Up @@ -1315,7 +1428,7 @@ bool BlobStore::BlobStats::BlobStat::removePosFromStat(BlobFileOffset offset, si

sm_valid_size -= buf_size;
sm_valid_rate = sm_valid_size * 1.0 / sm_total_size;
return (isReadOnly() && sm_valid_size == 0);
return ((isReadOnly() || isBigBlob()) && sm_valid_size == 0);
}

void BlobStore::BlobStats::BlobStat::restoreSpaceMap(BlobFileOffset offset, size_t buf_size)
Expand Down
Loading

0 comments on commit 65bdafa

Please sign in to comment.