Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix some bug in mvcc and blobstore #3905

Merged
merged 11 commits into from
Jan 21, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/PageDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ using PageSize = UInt64;

using BlobFileId = UInt32;
using BlobFileOffset = UInt64;
static constexpr BlobFileId INVALID_BLOBFILE_ID = std::numeric_limits<BlobFileId>::max();
static constexpr BlobFileId INVALID_BLOBFILE_ID = 0;
static constexpr BlobFileOffset INVALID_BLOBFILE_OFFSET = std::numeric_limits<BlobFileOffset>::max();

struct ByteBuffer
Expand Down
35 changes: 32 additions & 3 deletions dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & wr
edit.ref(write.page_id, write.ori_page_id);
break;
}
case WriteBatch::WriteType::PUT:
{ // Only putExternal won't have data.
PageEntryV3 entry;
entry.tag = write.tag;

edit.put(write.page_id, entry);
break;
}
default:
throw Exception("write batch have a invalid total size.",
ErrorCodes::LOGICAL_ERROR);
Expand Down Expand Up @@ -100,6 +108,7 @@ PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & wr

entry.file_id = blob_id;
entry.size = write.size;
entry.tag = write.tag;
entry.offset = offset_in_file + offset_in_allocated;
offset_in_allocated += write.size;

Expand Down Expand Up @@ -193,23 +202,33 @@ std::pair<BlobFileId, BlobFileOffset> BlobStore::getPosFromStats(size_t size)
{
stat = blob_stats.createStat(blob_file_id, lock_stats);
}

// We must get the lock from BlobStat under the BlobStats lock.
// It will ensure that BlobStat updates are in order.
// Also it won't incur more overhead.
// If BlobStat can updates are not order. Then
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved

stat->sm_lock.lock();
}

// Get Postion from single stat
auto lock_stat = stat->lock();
auto old_max_cap = stat->sm_max_caps;
BlobFileOffset offset = stat->getPosFromStat(size);
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved

// Can't insert into this spacemap
if (offset == INVALID_BLOBFILE_OFFSET)
{
stat->sm_lock.unlock();
stat->smap->logStats();
throw Exception(fmt::format("Get postion from BlobStat failed, it may caused by `sm_max_caps` is no corrent. [size={}, max_caps={}, BlobFileId={}]",
throw Exception(fmt::format("Get postion from BlobStat failed, it may caused by `sm_max_caps` is no corrent. [size={}, old_max_caps={}, max_caps(updated)={}, BlobFileId={}]",
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
size,
old_max_cap,
stat->sm_max_caps,
stat->id),
ErrorCodes::LOGICAL_ERROR);
}

stat->sm_lock.unlock();
return std::make_pair(stat->id, offset);
}

Expand Down Expand Up @@ -328,7 +347,11 @@ std::vector<BlobFileId> BlobStore::getGCStats()
auto right_margin = stat->smap->getRightMargin();

stat->sm_valid_rate = stat->sm_valid_size * 1.0 / right_margin;
assert(stat->sm_valid_rate <= 1.0);
if (stat->sm_valid_rate > 1.0)
{
LOG_FMT_ERROR(log, "Current blob got a invalid rate {:.2f}, total size is {} , valid size is {} , right margin is {}", stat->sm_valid_rate, stat->sm_total_size, stat->sm_valid_size, right_margin);
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
assert(false);
}

// Check if GC is required
if (stat->sm_valid_rate <= config.heavy_gc_valid_rate)
Expand Down Expand Up @@ -573,6 +596,12 @@ std::pair<BlobStatPtr, BlobFileId> BlobStore::BlobStats::chooseStat(size_t buf_s
return std::make_pair(nullptr, chooseNewStat());
}

// We need to assume that this insert will reduce max_cap.
// Because other threads may also be waiting for BlobStats to chooseStat during this time.
// If max_cap is not reduced, it may cause the same BlobStat to accept multiple buffers and exceed its max_cap.
// After the BlobStore records the buffer size, max_caps will also get an accurate update.
// So there won't get problem in reducing max_caps here.
stat_ptr->sm_max_caps -= buf_size;
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
return std::make_pair(stat_ptr, INVALID_BLOBFILE_ID);
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/BlobStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class BlobStore : public Allocator<false>
Poco::Logger * log;
BlobStore::Config config;

BlobFileId roll_id = 0;
BlobFileId roll_id = 1;
std::list<BlobFileId> old_ids;
std::list<BlobStatPtr> stats_map;
mutable std::mutex lock_stats;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Page/V3/PageEntry.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ struct PageEntryV3
public:
BlobFileId file_id = 0; // The id of page data persisted in
PageSize size = 0; // The size of page data
UInt64 tag = 0;
BlobFileOffset offset = 0; // The offset of page data in file
UInt64 checksum = 0; // The checksum of whole page data

Expand Down
71 changes: 39 additions & 32 deletions dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ TEST_F(BlobStoreTest, testStat)
BlobStats stats(&Poco::Logger::get("BlobStoreTest"), config);

std::tie(stat, blob_file_id) = stats.chooseStat(10, BLOBFILE_LIMIT_SIZE, stats.lock());
ASSERT_EQ(blob_file_id, 0);
ASSERT_EQ(blob_file_id, 1);
ASSERT_FALSE(stat);

// still 0
std::tie(stat, blob_file_id) = stats.chooseStat(10, BLOBFILE_LIMIT_SIZE, stats.lock());
ASSERT_EQ(blob_file_id, 0);
ASSERT_EQ(blob_file_id, 1);
ASSERT_FALSE(stat);

stats.createStat(0, stats.lock());
Expand Down Expand Up @@ -139,7 +139,7 @@ TEST_F(BlobStoreTest, testFullStats)

BlobStats stats(&Poco::Logger::get("BlobStoreTest"), config);

stat = stats.createStat(0, stats.lock());
stat = stats.createStat(1, stats.lock());
offset = stat->getPosFromStat(BLOBFILE_LIMIT_SIZE - 1);
ASSERT_EQ(offset, 0);

Expand All @@ -154,7 +154,7 @@ TEST_F(BlobStoreTest, testFullStats)

// Won't choose full one
std::tie(stat, blob_file_id) = stats.chooseStat(100, BLOBFILE_LIMIT_SIZE, stats.lock());
ASSERT_EQ(blob_file_id, 1);
ASSERT_EQ(blob_file_id, 2);
ASSERT_FALSE(stat);

// A new stat can use
Expand All @@ -163,17 +163,17 @@ TEST_F(BlobStoreTest, testFullStats)
ASSERT_EQ(offset, 0);

// Remove the stat which id is 0 , now remain the stat which id is 1
stats.eraseStat(0, stats.lock());
stats.eraseStat(1, stats.lock());

// Then full the stat which id 1
// Then full the stat which id 2
offset = stat->getPosFromStat(BLOBFILE_LIMIT_SIZE - 100);
ASSERT_EQ(offset, 100);

// Then choose stat , it should return the stat id 0
// Then choose stat , it should return the stat id 1
// cause in this time , stat which id is 1 have been earsed,
// and stat which id is 1 is full.
// and stat which id is 2 is full.
std::tie(stat, blob_file_id) = stats.chooseStat(100, BLOBFILE_LIMIT_SIZE, stats.lock());
ASSERT_EQ(blob_file_id, 0);
ASSERT_EQ(blob_file_id, 1);
ASSERT_FALSE(stat);
}

Expand Down Expand Up @@ -213,7 +213,7 @@ TEST_F(BlobStoreTest, testWriteRead)
ASSERT_EQ(record.type, WriteBatch::WriteType::PUT);
ASSERT_EQ(record.entry.offset, index * buff_size);
ASSERT_EQ(record.entry.size, buff_size);
ASSERT_EQ(record.entry.file_id, 0);
ASSERT_EQ(record.entry.file_id, 1);

// Read directly from the file
blob_store.read(record.entry.file_id,
Expand All @@ -238,7 +238,7 @@ TEST_F(BlobStoreTest, testWriteRead)
// Test `PageMap` read
page_id = 50;
index = 0;
auto page_map = blob_store.read(entries, /* ReadLimiterPtr */ nullptr);
auto page_map = blob_store.read(entries);
for (auto & [id, page] : page_map)
{
ASSERT_EQ(id, page_id++);
Expand All @@ -252,7 +252,7 @@ TEST_F(BlobStoreTest, testWriteRead)
index = 0;
for (auto & entry : entries)
{
auto page = blob_store.read(entry, /* ReadLimiterPtr */ nullptr);
auto page = blob_store.read(entry);
ASSERT_EQ(page.data.size(), buff_size);
ASSERT_EQ(strncmp(c_buff + index * buff_size, page.data.begin(), page.data.size()), 0);
index++;
Expand Down Expand Up @@ -305,7 +305,7 @@ TEST_F(BlobStoreTest, testFeildOffsetWriteRead)
ASSERT_EQ(record.type, WriteBatch::WriteType::PUT);
ASSERT_EQ(record.entry.offset, index * buff_size);
ASSERT_EQ(record.entry.size, buff_size);
ASSERT_EQ(record.entry.file_id, 0);
ASSERT_EQ(record.entry.file_id, 1);

PageFieldSizes check_field_sizes;
for (const auto & [field_offset, crc] : record.entry.field_offsets)
Expand Down Expand Up @@ -364,14 +364,14 @@ try
ASSERT_EQ(record.page_id, page_id);
ASSERT_EQ(record.entry.offset, 0);
ASSERT_EQ(record.entry.size, buff_size);
ASSERT_EQ(record.entry.file_id, 0);
ASSERT_EQ(record.entry.file_id, 1);

record = records[1];
ASSERT_EQ(record.type, WriteBatch::WriteType::PUT);
ASSERT_EQ(record.page_id, page_id);
ASSERT_EQ(record.entry.offset, buff_size);
ASSERT_EQ(record.entry.size, buff_size);
ASSERT_EQ(record.entry.file_id, 0);
ASSERT_EQ(record.entry.file_id, 1);
}


Expand Down Expand Up @@ -422,7 +422,7 @@ try
ASSERT_EQ(record.page_id, page_id);
ASSERT_EQ(record.entry.offset, buff_size * 2);
ASSERT_EQ(record.entry.size, buff_size);
ASSERT_EQ(record.entry.file_id, 0);
ASSERT_EQ(record.entry.file_id, 1);

record = records[1];
ASSERT_EQ(record.type, WriteBatch::WriteType::REF);
Expand Down Expand Up @@ -487,7 +487,7 @@ TEST_F(BlobStoreTest, testWriteOutOfLimitSize)
ASSERT_EQ(record.page_id, 50);
ASSERT_EQ(record.entry.offset, 0);
ASSERT_EQ(record.entry.size, buf_size);
ASSERT_EQ(record.entry.file_id, 0);
ASSERT_EQ(record.entry.file_id, 1);

wb.clear();
wb.putPage(51, /*tag*/ 0, buff2, buf_size);
Expand All @@ -500,7 +500,7 @@ TEST_F(BlobStoreTest, testWriteOutOfLimitSize)
ASSERT_EQ(record.page_id, 51);
ASSERT_EQ(record.entry.offset, 0);
ASSERT_EQ(record.entry.size, buf_size);
ASSERT_EQ(record.entry.file_id, 1);
ASSERT_EQ(record.entry.file_id, 2);
}
}

Expand All @@ -519,7 +519,10 @@ TEST_F(BlobStoreTest, testBlobStoreGcStats)
char c_buff[buff_size * buff_nums];
for (size_t i = 0; i < buff_nums; ++i)
{
c_buff[i * buff_size] = static_cast<char>((0xff) + i);
for (size_t j = 0; j < buff_size; ++j)
{
c_buff[j + i * buff_size] = static_cast<char>((j & 0xff) + i);
}
ReadBufferPtr buff = std::make_shared<ReadBufferFromMemory>(const_cast<char *>(c_buff + i * buff_size), buff_size);
wb.putPage(page_id, /* tag */ 0, buff, buff_size);
}
Expand Down Expand Up @@ -555,9 +558,9 @@ TEST_F(BlobStoreTest, testBlobStoreGcStats)
// After remove `entries_del1`.
// Remain entries index [0, 2, 5, 6, 8]
blob_store.remove(entries_del1);
ASSERT_EQ(entries_del1.begin()->file_id, 0);
ASSERT_EQ(entries_del1.begin()->file_id, 1);

auto stat = blob_store.blob_stats.fileIdToStat(0);
auto stat = blob_store.blob_stats.fileIdToStat(1);

ASSERT_EQ(stat->sm_valid_rate, 0.5);
ASSERT_EQ(stat->sm_total_size, buff_size * buff_nums);
Expand All @@ -580,7 +583,7 @@ TEST_F(BlobStoreTest, testBlobStoreGcStats)
ASSERT_EQ(stat->sm_valid_size, buff_size * 3);

// Check disk file have been truncate to right margin
String path = blob_store.getBlobFilePath(0);
String path = blob_store.getBlobFilePath(1);
Poco::File blob_file_in_disk(path);
ASSERT_EQ(blob_file_in_disk.getSize(), stat->sm_total_size);
}
Expand All @@ -599,7 +602,10 @@ TEST_F(BlobStoreTest, testBlobStoreGcStats2)
char c_buff[buff_size * buff_nums];
for (size_t i = 0; i < buff_nums; ++i)
{
c_buff[i * buff_size] = static_cast<char>((0xff) + i);
for (size_t j = 0; j < buff_size; ++j)
{
c_buff[j + i * buff_size] = static_cast<char>((j & 0xff) + i);
}
ReadBufferPtr buff = std::make_shared<ReadBufferFromMemory>(const_cast<char *>(c_buff + i * buff_size), buff_size);
wb.putPage(page_id, /* tag */ 0, buff, buff_size);
}
Expand Down Expand Up @@ -627,7 +633,7 @@ TEST_F(BlobStoreTest, testBlobStoreGcStats2)
// Remain entries index [8, 9].
blob_store.remove(entries_del);

auto stat = blob_store.blob_stats.fileIdToStat(0);
auto stat = blob_store.blob_stats.fileIdToStat(1);

const auto & gc_stats = blob_store.getGCStats();
ASSERT_FALSE(gc_stats.empty());
Expand All @@ -637,7 +643,7 @@ TEST_F(BlobStoreTest, testBlobStoreGcStats2)
ASSERT_EQ(stat->sm_valid_size, buff_size * 2);

// Then we must do heavy GC
ASSERT_EQ(*gc_stats.begin(), 0);
ASSERT_EQ(*gc_stats.begin(), 1);
}


Expand Down Expand Up @@ -677,10 +683,10 @@ TEST_F(BlobStoreTest, GC)
PageIdAndVersionedEntries versioned_pageid_entries;
versioned_pageid_entries.emplace_back(std::make_pair(page_id, versioned_entries));
std::map<BlobFileId, PageIdAndVersionedEntries> gc_context;
gc_context[0] = versioned_pageid_entries;
gc_context[1] = versioned_pageid_entries;

// Before we do BlobStore we need change BlobFile0 to Read-Only
auto stat = blob_store.blob_stats.fileIdToStat(0);
auto stat = blob_store.blob_stats.fileIdToStat(1);
stat->changeToReadOnly();

const auto & gc_edit = blob_store.gc(gc_context, static_cast<PageSize>(buff_size * buff_nums));
Expand All @@ -691,18 +697,19 @@ TEST_F(BlobStoreTest, GC)
for (const auto & record : gc_edit.getRecords())
{
ASSERT_EQ(record.page_id, page_id);
ASSERT_EQ(record.entry.file_id, 1);
ASSERT_EQ(record.entry.file_id, 2);
ASSERT_EQ(record.entry.checksum, it->second.checksum);
ASSERT_EQ(record.entry.size, it->second.size);
it++;
}

// Check blobfile1
Poco::File file0(blob_store.getBlobFilePath(0));
Poco::File file1(blob_store.getBlobFilePath(1));
ASSERT_TRUE(file0.exists());
Poco::File file2(blob_store.getBlobFilePath(2));
ASSERT_TRUE(file1.exists());
ASSERT_EQ(file0.getSize(), file1.getSize());
ASSERT_TRUE(file2.exists());
ASSERT_EQ(file1.getSize(), file2.getSize());
}

} // namespace DB::PS::V3::tests

} // namespace DB::PS::V3::tests