diff --git a/dbms/src/Storages/Page/V3/BlobStore.cpp b/dbms/src/Storages/Page/V3/BlobStore.cpp index bf824fddd74..14d87f2c870 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.cpp +++ b/dbms/src/Storages/Page/V3/BlobStore.cpp @@ -2,8 +2,10 @@ #include #include #include +#include #include +#include namespace ProfileEvents { @@ -90,7 +92,6 @@ PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & wr switch (write.type) { case WriteBatch::WriteType::PUT: - case WriteBatch::WriteType::UPSERT: { ChecksumClass digest; PageEntryV3 entry; @@ -124,16 +125,7 @@ PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & wr } buffer_pos += write.size; - - if (write.type == WriteBatch::WriteType::PUT) - { - edit.put(write.page_id, entry); - } - else // WriteBatch::WriteType::UPSERT - { - edit.upsertPage(write.page_id, entry); - } - + edit.put(write.page_id, entry); break; } case WriteBatch::WriteType::DEL: @@ -146,6 +138,8 @@ PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & wr edit.ref(write.page_id, write.ori_page_id); break; } + default: + throw Exception(fmt::format("Unknown write type: {}", write.type)); } } @@ -192,12 +186,12 @@ std::pair BlobStore::getPosFromStats(size_t size) { auto lock_stats = blob_stats.lock(); BlobFileId blob_file_id = INVALID_BLOBFILE_ID; - std::tie(stat, blob_file_id) = blob_stats.chooseStat(size, config.file_limit_size); + std::tie(stat, blob_file_id) = blob_stats.chooseStat(size, config.file_limit_size, lock_stats); // No valid stat for puting data with `size`, create a new one if (stat == nullptr) { - stat = blob_stats.createStat(blob_file_id); + stat = blob_stats.createStat(blob_file_id, lock_stats); } } @@ -362,13 +356,11 @@ std::vector BlobStore::getGCStats() return blob_need_gc; } -PageIdAndVersionedEntryList BlobStore::gc(std::map & entries_need_gc, - const PageSize & total_page_size, - const WriteLimiterPtr & write_limiter, - const ReadLimiterPtr & read_limiter) +PageEntriesEdit BlobStore::gc(std::map & entries_need_gc, + const PageSize & total_page_size, + const WriteLimiterPtr & write_limiter, + const ReadLimiterPtr & read_limiter) { - PageIdAndVersionedEntryList copy_list; - PageEntriesEdit edit; if (total_page_size == 0) @@ -407,12 +399,12 @@ PageIdAndVersionedEntryList BlobStore::gc(std::map BlobStore::BlobStats::lock() +std::lock_guard BlobStore::BlobStats::lock() const { return std::lock_guard(lock_stats); } -BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id) +BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id, const std::lock_guard &) { BlobStatPtr stat = nullptr; @@ -508,7 +500,7 @@ BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id) return stat; } -void BlobStore::BlobStats::eraseStat(BlobFileId blob_file_id) +void BlobStore::BlobStats::eraseStat(BlobFileId blob_file_id, const std::lock_guard &) { BlobStatPtr stat = nullptr; bool found = false; @@ -553,7 +545,7 @@ BlobFileId BlobStore::BlobStats::chooseNewStat() return rv; } -std::pair BlobStore::BlobStats::chooseStat(size_t buf_size, UInt64 file_limit_size) +std::pair BlobStore::BlobStats::chooseStat(size_t buf_size, UInt64 file_limit_size, const std::lock_guard &) { BlobStatPtr stat_ptr = nullptr; double smallest_valid_rate = 2; @@ -638,6 +630,7 @@ void BlobStore::BlobStats::BlobStat::removePosFromStat(BlobFileOffset offset, si BlobStatPtr BlobStore::BlobStats::fileIdToStat(BlobFileId file_id) { + auto guard = lock(); for (auto & stat : stats_map) { if (stat->id == file_id) diff --git a/dbms/src/Storages/Page/V3/BlobStore.h b/dbms/src/Storages/Page/V3/BlobStore.h index 41cbfbb9c67..c5305189165 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.h +++ b/dbms/src/Storages/Page/V3/BlobStore.h @@ -19,7 +19,6 @@ extern const int LOGICAL_ERROR; namespace PS::V3 { -using PageIdAndVersionedEntryList = std::vector>; using PageIdAndVersionedEntries = std::vector>; class BlobStore : public Allocator @@ -100,11 +99,11 @@ class BlobStore : public Allocator public: BlobStats(Poco::Logger * log_, BlobStore::Config config); - std::lock_guard lock(); + std::lock_guard lock() const; - BlobStatPtr createStat(BlobFileId blob_file_id); + BlobStatPtr createStat(BlobFileId blob_file_id, const std::lock_guard &); - void eraseStat(BlobFileId blob_file_id); + void eraseStat(BlobFileId blob_file_id, const std::lock_guard &); /** * Choose a available `BlobStat` from `BlobStats`. @@ -120,7 +119,7 @@ class BlobStore : public Allocator * The `INVALID_BLOBFILE_ID` means that you don't need create a new `BlobFile`. * */ - std::pair chooseStat(size_t buf_size, UInt64 file_limit_size); + std::pair chooseStat(size_t buf_size, UInt64 file_limit_size, const std::lock_guard &); BlobFileId chooseNewStat(); @@ -128,6 +127,7 @@ class BlobStore : public Allocator std::list getStats() const { + auto guard = lock(); return stats_map; } @@ -140,7 +140,7 @@ class BlobStore : public Allocator BlobFileId roll_id = 0; std::list old_ids; std::list stats_map; - std::mutex lock_stats; + mutable std::mutex lock_stats; }; BlobStore(const FileProviderPtr & file_provider_, String path, BlobStore::Config config); @@ -149,10 +149,10 @@ class BlobStore : public Allocator std::vector getGCStats(); - PageIdAndVersionedEntryList gc(std::map & entries_need_gc, - const PageSize & total_page_size, - const WriteLimiterPtr & write_limiter = nullptr, - const ReadLimiterPtr & read_limiter = nullptr); + PageEntriesEdit gc(std::map & entries_need_gc, + const PageSize & total_page_size, + const WriteLimiterPtr & write_limiter = nullptr, + const ReadLimiterPtr & read_limiter = nullptr); PageEntriesEdit write(DB::WriteBatch & wb, const WriteLimiterPtr & write_limiter = nullptr); @@ -202,4 +202,4 @@ class BlobStore : public Allocator using BlobStorePtr = std::shared_ptr; } // namespace PS::V3 -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Storages/Page/V3/LogFile/LogReader.cpp b/dbms/src/Storages/Page/V3/LogFile/LogReader.cpp index 637659bdbd9..2bacb580622 100644 --- a/dbms/src/Storages/Page/V3/LogFile/LogReader.cpp +++ b/dbms/src/Storages/Page/V3/LogFile/LogReader.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -37,6 +38,8 @@ LogReader::LogReader( , log_number(log_num_) , log(log_) { + // Must be `BLOCK_SIZE`, or we can not ensure the correctness of reading. + assert(file->internalBuffer().size() == Format::BLOCK_SIZE); } LogReader::~LogReader() = default; diff --git a/dbms/src/Storages/Page/V3/LogFile/LogReader.h b/dbms/src/Storages/Page/V3/LogFile/LogReader.h index 08b66d8d2a3..5f0cd6ad8a8 100644 --- a/dbms/src/Storages/Page/V3/LogFile/LogReader.h +++ b/dbms/src/Storages/Page/V3/LogFile/LogReader.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include #include diff --git a/dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp b/dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp index c54762b0c93..bab6ba0fb39 100644 --- a/dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp +++ b/dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -21,6 +22,8 @@ LogWriter::LogWriter( , recycle_log_files(recycle_log_files_) , manual_flush(manual_flush_) { + // Must be `BLOCK_SIZE`, or we can not ensure the correctness of writing. + assert(dest->internalBuffer().size() == Format::BLOCK_SIZE); } LogWriter::~LogWriter() diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index c263cd5233b..9435066750d 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -19,6 +19,10 @@ extern const int PS_ENTRY_NO_VALID_VERSION; } // namespace ErrorCodes namespace PS::V3 { +/******************************** + * VersionedPageEntries methods * + ********************************/ + std::optional VersionedPageEntries::getEntry(UInt64 seq) const { auto page_lock = acquireLock(); @@ -32,7 +36,6 @@ std::optional VersionedPageEntries::getEntry(UInt64 seq) const return std::nullopt; } - std::pair VersionedPageEntries::getEntriesByBlobId(BlobFileId blob_id) { PageSize single_page_size = 0; @@ -103,6 +106,10 @@ std::pair VersionedPageEntries::deleteAndGC(UInt64 lowest_s return std::make_pair(std::move(del_entries), entries.empty() || (entries.size() == 1 && entries.begin()->second.is_delete)); } +/************************** + * PageDirectory methods * + *************************/ + PageDirectory::PageDirectory() : sequence(0) , log(getLogWithPrefix(nullptr, "PageDirectory")) @@ -117,7 +124,10 @@ void PageDirectory::restore() PageDirectorySnapshotPtr PageDirectory::createSnapshot() const { auto snap = std::make_shared(sequence.load()); - snapshots.emplace_back(std::weak_ptr(snap)); + { + std::lock_guard snapshots_lock(snapshots_mutex); + snapshots.emplace_back(std::weak_ptr(snap)); + } return snap; } @@ -179,10 +189,8 @@ void PageDirectory::apply(PageEntriesEdit && edit) std::unique_lock write_lock(table_rw_mutex); // TODO: It is totally serialized, make it a pipeline UInt64 last_sequence = sequence.load(); - auto snap = createSnapshot(); - - // stage 1, persisted the changes to WAL - // wal.apply(edit); + // stage 1, persisted the changes to WAL with version [seq=last_seq + 1, epoch=0] + // wal->apply(edit, PageVersionType(last_sequence + 1, 0)); // stage 2, create entry version list for pageId. nothing need to be rollback std::unordered_map> updating_locks; @@ -269,23 +277,28 @@ void PageDirectory::apply(PageEntriesEdit && edit) sequence.fetch_add(1); } - -void PageDirectory::gcApply(const PageIdAndVersionedEntryList & migrated_entries) +void PageDirectory::gcApply(PageEntriesEdit && migrated_edit) { - for (const auto & [page_id, version, entry] : migrated_entries) + std::shared_lock read_lock(table_rw_mutex); + for (auto & record : migrated_edit.getMutRecords()) { - auto iter = mvcc_table_directory.find(page_id); - if (iter == mvcc_table_directory.end()) + auto iter = mvcc_table_directory.find(record.page_id); + if (unlikely(iter == mvcc_table_directory.end())) { - throw Exception(fmt::format("Can't found [pageid={}] while doing gcApply", page_id), ErrorCodes::LOGICAL_ERROR); + throw Exception(fmt::format("Can't found [pageid={}] while doing gcApply", record.page_id), ErrorCodes::LOGICAL_ERROR); } - auto versioned_page = iter->second; - iter->second->acquireLock(); - iter->second->createNewVersion(version.sequence, version.epoch + 1, entry); + // Increase the epoch for migrated record. + record.version.epoch += 1; - // TBD: wal apply + // Append the gc version to version list + auto & versioned_entries = iter->second; + auto page_lock = versioned_entries->acquireLock(); + versioned_entries->createNewVersion(record.version.sequence, record.version.epoch, record.entry); } + + // Apply migrate edit into WAL with the increased epoch version + // wal->apply(migrated_edit); } std::pair, PageSize> @@ -330,17 +343,21 @@ std::vector PageDirectory::gc() UInt64 lowest_seq = sequence.load(); std::vector all_del_entries; - // Cleanup released snapshots - for (auto iter = snapshots.begin(); iter != snapshots.end(); /* empty */) { - if (iter->expired()) - iter = snapshots.erase(iter); - else + // Cleanup released snapshots + std::lock_guard lock(snapshots_mutex); + for (auto iter = snapshots.begin(); iter != snapshots.end(); /* empty */) { - lowest_seq = std::min(lowest_seq, iter->lock()->sequence); - ++iter; + if (iter->expired()) + iter = snapshots.erase(iter); + else + { + lowest_seq = std::min(lowest_seq, iter->lock()->sequence); + ++iter; + } } } + { std::unique_lock write_lock(table_rw_mutex); for (auto iter = mvcc_table_directory.begin(); iter != mvcc_table_directory.end(); /*empty*/) diff --git a/dbms/src/Storages/Page/V3/PageDirectory.h b/dbms/src/Storages/Page/V3/PageDirectory.h index dc36f0467b9..88cf60bb60f 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.h +++ b/dbms/src/Storages/Page/V3/PageDirectory.h @@ -139,11 +139,11 @@ class PageDirectory void apply(PageEntriesEdit && edit); - std::vector gc(); - std::pair, PageSize> getEntriesByBlobIds(const std::vector & blob_need_gc); - void gcApply(const PageIdAndVersionedEntryList & migrated_entries); + void gcApply(PageEntriesEdit && migrated_edit); + + std::vector gc(); size_t numPages() const { @@ -157,9 +157,10 @@ class PageDirectory using MVCCMapType = std::unordered_map; MVCCMapType mvcc_table_directory; + mutable std::mutex snapshots_mutex; mutable std::list> snapshots; - WALStore wal; + WALStorePtr wal; LogWithPrefixPtr log; }; diff --git a/dbms/src/Storages/Page/V3/PageEntriesEdit.h b/dbms/src/Storages/Page/V3/PageEntriesEdit.h index 1a18b606187..e6449f85db0 100644 --- a/dbms/src/Storages/Page/V3/PageEntriesEdit.h +++ b/dbms/src/Storages/Page/V3/PageEntriesEdit.h @@ -60,11 +60,12 @@ class PageEntriesEdit records.emplace_back(record); } - void upsertPage(PageId page_id, const PageEntryV3 & entry) + void upsertPage(PageId page_id, const PageVersionType & ver, const PageEntryV3 & entry) { EditRecord record{}; record.type = WriteBatch::WriteType::UPSERT; record.page_id = page_id; + record.version = ver; record.entry = entry; records.emplace_back(record); } @@ -103,6 +104,7 @@ class PageEntriesEdit WriteBatch::WriteType type; PageId page_id; PageId ori_page_id; + PageVersionType version; PageEntryV3 entry; }; using EditRecords = std::vector; @@ -112,7 +114,7 @@ class PageEntriesEdit records.emplace_back(rec); } - EditRecords & getRecords() { return records; } + EditRecords & getMutRecords() { return records; } const EditRecords & getRecords() const { return records; } private: diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp index 95efa0be3b7..418e70102b6 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp @@ -144,9 +144,8 @@ bool PageStorageImpl::gc(bool not_skip, const WriteLimiterPtr & write_limiter, c // 5. Do the BlobStore GC // After BlobStore GC, these entries will be migrated to a new blob. // Then we should notify MVCC apply the change. - const auto & copy_list = blob_store.gc(blob_gc_info, total_page_size); - - if (copy_list.empty()) + PageEntriesEdit gc_edit = blob_store.gc(blob_gc_info, total_page_size); + if (gc_edit.empty()) { throw Exception("Something wrong after BlobStore GC.", ErrorCodes::LOGICAL_ERROR); } @@ -154,7 +153,11 @@ bool PageStorageImpl::gc(bool not_skip, const WriteLimiterPtr & write_limiter, c // 6. MVCC gc apply // MVCC will apply the migrated entries. // Also it will generate a new version for these entries. - page_directory.gcApply(copy_list); + // Note that if the process crash between step 5 and step 6, the stats in BlobStore will + // be reset to correct state during restore. If any exception thrown, then some BlobFiles + // will be remained as "read-only" files while entries in them are useless in actual. + // Those BlobFiles should be cleaned during next restore. + page_directory.gcApply(std::move(gc_edit)); return true; } diff --git a/dbms/src/Storages/Page/V3/WALStore.cpp b/dbms/src/Storages/Page/V3/WALStore.cpp new file mode 100644 index 00000000000..dca17fbf457 --- /dev/null +++ b/dbms/src/Storages/Page/V3/WALStore.cpp @@ -0,0 +1,74 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ +extern const int NOT_IMPLEMENTED; +} // namespace ErrorCodes +} // namespace DB +namespace DB::PS::V3 +{ +WALStorePtr WALStore::create( + FileProviderPtr & /*provider*/, + PSDiskDelegatorPtr & /*delegator*/, + const WriteLimiterPtr & /*write_limiter*/) +{ + throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); +} + +WALStore::WALStore( + const String & path_, + const FileProviderPtr & provider_, + const WriteLimiterPtr & write_limiter_, + std::unique_ptr && cur_log) + : path(path_) + , provider(provider_) + , write_limiter(write_limiter_) + , log_file(std::move(cur_log)) + , logger(&Poco::Logger::get("WALStore")) +{ +} + +void WALStore::apply(PageEntriesEdit & edit, const PageVersionType & version) +{ + for (auto & r : edit.getMutRecords()) + { + r.version = version; + } + apply(edit); +} + +void WALStore::apply(const PageEntriesEdit & /*edit*/) +{ + throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); +} + +void WALStore::gc() +{ + throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); +} + +} // namespace DB::PS::V3 diff --git a/dbms/src/Storages/Page/V3/WALStore.h b/dbms/src/Storages/Page/V3/WALStore.h index 328cce65e87..c1430fa2608 100644 --- a/dbms/src/Storages/Page/V3/WALStore.h +++ b/dbms/src/Storages/Page/V3/WALStore.h @@ -1,9 +1,20 @@ #pragma once +#include +#include +#include #include +#include + namespace DB { +class FileProvider; +using FileProviderPtr = std::shared_ptr; +class WriteLimiter; +using WriteLimiterPtr = std::shared_ptr; +class PSDiskDelegator; +using PSDiskDelegatorPtr = std::shared_ptr; namespace PS::V3 { enum class WALRecoveryMode : UInt8 @@ -46,8 +57,42 @@ enum class WALRecoveryMode : UInt8 SkipAnyCorruptedRecords = 0x03, }; +class WALStore; +using WALStorePtr = std::unique_ptr; + +class WALStoreReader; +using WALStoreReaderPtr = std::shared_ptr; + class WALStore { +public: + using ChecksumClass = Digest::CRC64; + + static WALStorePtr create( + FileProviderPtr & provider, + PSDiskDelegatorPtr & delegator, + const WriteLimiterPtr & write_limiter); + + void apply(PageEntriesEdit & edit, const PageVersionType & version); + void apply(const PageEntriesEdit & edit); + + static WALStoreReaderPtr createReader(FileProviderPtr & provider, PSDiskDelegatorPtr & delegator); + + void gc(); + +private: + WALStore( + const String & path_, + const FileProviderPtr & provider_, + const WriteLimiterPtr & write_limiter_, + std::unique_ptr && cur_log); + + const String path; + const FileProviderPtr provider; + const WriteLimiterPtr write_limiter; + std::unique_ptr log_file; + + Poco::Logger * logger; }; } // namespace PS::V3 diff --git a/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp b/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp index 3dcdfe55b9f..d48cf668960 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp @@ -34,18 +34,19 @@ TEST_F(BlobStoreTest, testStats) { BlobStats stats(&Poco::Logger::get("BlobStoreTest"), config); - auto stat = stats.createStat(0); + + auto stat = stats.createStat(0, stats.lock()); ASSERT_TRUE(stat); ASSERT_TRUE(stat->smap); - stats.createStat(1); - stats.createStat(2); + stats.createStat(1, stats.lock()); + stats.createStat(2, stats.lock()); ASSERT_EQ(stats.stats_map.size(), 3); ASSERT_EQ(stats.roll_id, 3); - stats.eraseStat(0); - stats.eraseStat(1); + stats.eraseStat(0, stats.lock()); + stats.eraseStat(1, stats.lock()); ASSERT_EQ(stats.stats_map.size(), 1); ASSERT_EQ(stats.roll_id, 3); ASSERT_EQ(stats.old_ids.size(), 2); @@ -65,17 +66,17 @@ TEST_F(BlobStoreTest, testStat) BlobStats stats(&Poco::Logger::get("BlobStoreTest"), config); - std::tie(stat, blob_file_id) = stats.chooseStat(10, BLOBFILE_LIMIT_SIZE); + std::tie(stat, blob_file_id) = stats.chooseStat(10, BLOBFILE_LIMIT_SIZE, stats.lock()); ASSERT_EQ(blob_file_id, 0); ASSERT_FALSE(stat); // still 0 - std::tie(stat, blob_file_id) = stats.chooseStat(10, BLOBFILE_LIMIT_SIZE); + std::tie(stat, blob_file_id) = stats.chooseStat(10, BLOBFILE_LIMIT_SIZE, stats.lock()); ASSERT_EQ(blob_file_id, 0); ASSERT_FALSE(stat); - stats.createStat(0); - std::tie(stat, blob_file_id) = stats.chooseStat(10, BLOBFILE_LIMIT_SIZE); + stats.createStat(0, stats.lock()); + std::tie(stat, blob_file_id) = stats.chooseStat(10, BLOBFILE_LIMIT_SIZE, stats.lock()); ASSERT_EQ(blob_file_id, INVALID_BLOBFILE_ID); ASSERT_TRUE(stat); @@ -138,7 +139,7 @@ TEST_F(BlobStoreTest, testFullStats) BlobStats stats(&Poco::Logger::get("BlobStoreTest"), config); - stat = stats.createStat(0); + stat = stats.createStat(0, stats.lock()); offset = stat->getPosFromStat(BLOBFILE_LIMIT_SIZE - 1); ASSERT_EQ(offset, 0); @@ -152,17 +153,17 @@ TEST_F(BlobStoreTest, testFullStats) ASSERT_LE(stat->sm_valid_rate, 1); // Won't choose full one - std::tie(stat, blob_file_id) = stats.chooseStat(100, BLOBFILE_LIMIT_SIZE); + std::tie(stat, blob_file_id) = stats.chooseStat(100, BLOBFILE_LIMIT_SIZE, stats.lock()); ASSERT_EQ(blob_file_id, 1); ASSERT_FALSE(stat); // A new stat can use - stat = stats.createStat(blob_file_id); + stat = stats.createStat(blob_file_id, stats.lock()); offset = stat->getPosFromStat(100); ASSERT_EQ(offset, 0); // Remove the stat which id is 0 , now remain the stat which id is 1 - stats.eraseStat(0); + stats.eraseStat(0, stats.lock()); // Then full the stat which id 1 offset = stat->getPosFromStat(BLOBFILE_LIMIT_SIZE - 100); @@ -171,7 +172,7 @@ TEST_F(BlobStoreTest, testFullStats) // Then choose stat , it should return the stat id 0 // cause in this time , stat which id is 1 have been earsed, // and stat which id is 1 is full. - std::tie(stat, blob_file_id) = stats.chooseStat(100, BLOBFILE_LIMIT_SIZE); + std::tie(stat, blob_file_id) = stats.chooseStat(100, BLOBFILE_LIMIT_SIZE, stats.lock()); ASSERT_EQ(blob_file_id, 0); ASSERT_FALSE(stat); } @@ -207,7 +208,7 @@ TEST_F(BlobStoreTest, testWriteRead) char c_buff_read[buff_size * buff_nums]; size_t index = 0; - for (auto & record : edit.getRecords()) + for (const auto & record : edit.getRecords()) { ASSERT_EQ(record.type, WriteBatch::WriteType::PUT); ASSERT_EQ(record.entry.offset, index * buff_size); @@ -229,7 +230,7 @@ TEST_F(BlobStoreTest, testWriteRead) page_id = 50; PageIDAndEntriesV3 entries = {}; - for (auto & record : edit.getRecords()) + for (const auto & record : edit.getRecords()) { entries.emplace_back(std::make_pair(page_id++, record.entry)); } @@ -299,7 +300,7 @@ TEST_F(BlobStoreTest, testFeildOffsetWriteRead) char c_buff_read[buff_size * buff_nums]; size_t index = 0; - for (auto & record : edit.getRecords()) + for (const auto & record : edit.getRecords()) { ASSERT_EQ(record.type, WriteBatch::WriteType::PUT); ASSERT_EQ(record.entry.offset, index * buff_size); @@ -307,7 +308,7 @@ TEST_F(BlobStoreTest, testFeildOffsetWriteRead) ASSERT_EQ(record.entry.file_id, 0); PageFieldSizes check_field_sizes; - for (auto & [field_offset, crc] : record.entry.field_offsets) + for (const auto & [field_offset, crc] : record.entry.field_offsets) { check_field_sizes.emplace_back(field_offset); ASSERT_TRUE(crc); @@ -329,6 +330,7 @@ TEST_F(BlobStoreTest, testFeildOffsetWriteRead) } TEST_F(BlobStoreTest, testWrite) +try { const auto file_provider = DB::tests::TiFlashTestEnv::getContext().getFileProvider(); auto blob_store = BlobStore(file_provider, path, config); @@ -350,12 +352,7 @@ TEST_F(BlobStoreTest, testWrite) ReadBufferPtr buff2 = std::make_shared(c_buff2, buff_size); wb.putPage(page_id, /*tag*/ 0, buff1, buff_size); - wb.upsertPage(page_id, - /*tag*/ 0, - /*PageFileIdAndLevel*/ {}, - buff2, - buff_size, - /*PageFieldOffsetChecksums*/ {}); + wb.putPage(page_id, /*tag*/ 0, buff2, buff_size); PageEntriesEdit edit = blob_store.write(wb, nullptr); ASSERT_EQ(edit.size(), 2); @@ -370,7 +367,7 @@ TEST_F(BlobStoreTest, testWrite) ASSERT_EQ(record.entry.file_id, 0); record = records[1]; - ASSERT_EQ(record.type, WriteBatch::WriteType::UPSERT); + ASSERT_EQ(record.type, WriteBatch::WriteType::PUT); ASSERT_EQ(record.page_id, page_id); ASSERT_EQ(record.entry.offset, buff_size); ASSERT_EQ(record.entry.size, buff_size); @@ -437,6 +434,7 @@ TEST_F(BlobStoreTest, testWrite) ASSERT_EQ(record.page_id, page_id); } } +CATCH TEST_F(BlobStoreTest, testWriteOutOfLimitSize) { @@ -518,7 +516,7 @@ TEST_F(BlobStoreTest, testBlobStoreGcStats) WriteBatch wb; { - char c_buff[buff_size]; + char c_buff[buff_size * buff_nums]; for (size_t i = 0; i < buff_nums; ++i) { c_buff[i * buff_size] = static_cast((0xff) + i); @@ -598,7 +596,7 @@ TEST_F(BlobStoreTest, testBlobStoreGcStats2) WriteBatch wb; { - char c_buff[buff_size]; + char c_buff[buff_size * buff_nums]; for (size_t i = 0; i < buff_nums; ++i) { c_buff[i * buff_size] = static_cast((0xff) + i); @@ -685,18 +683,17 @@ TEST_F(BlobStoreTest, GC) auto stat = blob_store.blob_stats.fileIdToStat(0); stat->changeToReadOnly(); - const auto & copy_list = blob_store.gc(gc_context, static_cast(buff_size * buff_nums)); + const auto & gc_edit = blob_store.gc(gc_context, static_cast(buff_size * buff_nums)); // Check copy_list which will apply fo Mvcc - ASSERT_EQ(copy_list.size(), buff_nums); + ASSERT_EQ(gc_edit.size(), buff_nums); auto it = versioned_entries.begin(); - for (const auto & [page_id_, version_type, entry_] : copy_list) + for (const auto & record : gc_edit.getRecords()) { - (void)version_type; - ASSERT_EQ(page_id_, page_id); - ASSERT_EQ(entry_.file_id, 1); - ASSERT_EQ(it->second.checksum, entry_.checksum); - ASSERT_EQ(it->second.size, entry_.size); + ASSERT_EQ(record.page_id, page_id); + ASSERT_EQ(record.entry.file_id, 1); + ASSERT_EQ(record.entry.checksum, it->second.checksum); + ASSERT_EQ(record.entry.size, it->second.size); it++; } diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp index 27fbb07a750..76443a14a03 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -967,7 +968,7 @@ try EXPECT_EQ(entries_in_file2.size(), 2); // 2 page entries list EXPECT_EQ(entries_in_file3.size(), 1); // 1 page entries list - PageIdAndVersionedEntryList gc_migrate_entries; + PageEntriesEdit gc_migrate_entries; for (const auto & [file_id, entries] : candidate_entries_1.first) { (void)file_id; @@ -975,7 +976,7 @@ try { for (const auto & [ver, entry] : version_entries) { - gc_migrate_entries.emplace_back(page_id, ver, entry); + gc_migrate_entries.upsertPage(page_id, ver, entry); } } } @@ -986,13 +987,13 @@ try { for (const auto & [ver, entry] : version_entries) { - gc_migrate_entries.emplace_back(page_id, ver, entry); + gc_migrate_entries.upsertPage(page_id, ver, entry); } } } // Full GC execute apply - dir.gcApply(gc_migrate_entries); + dir.gcApply(std::move(gc_migrate_entries)); } CATCH @@ -1028,7 +1029,7 @@ try // `page_id` get removed EXPECT_EQ(dir.numPages(), 1); - PageIdAndVersionedEntryList gc_migrate_entries; + PageEntriesEdit gc_migrate_entries; for (const auto & [file_id, entries] : candidate_entries_1.first) { (void)file_id; @@ -1036,7 +1037,7 @@ try { for (const auto & [ver, entry] : version_entries) { - gc_migrate_entries.emplace_back(page_id, ver, entry); + gc_migrate_entries.upsertPage(page_id, ver, entry); } } } @@ -1047,13 +1048,13 @@ try { for (const auto & [ver, entry] : version_entries) { - gc_migrate_entries.emplace_back(page_id, ver, entry); + gc_migrate_entries.upsertPage(page_id, ver, entry); } } } // 1.2 Full GC execute apply - ASSERT_THROW({ dir.gcApply(gc_migrate_entries); }, DB::Exception); + ASSERT_THROW({ dir.gcApply(std::move(gc_migrate_entries)); }, DB::Exception); } CATCH diff --git a/dbms/src/Storages/Page/V3/tests/gtest_wal_log.cpp b/dbms/src/Storages/Page/V3/tests/gtest_wal_log.cpp index 4c34d8ac419..a9751cefa5a 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_wal_log.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_wal_log.cpp @@ -76,7 +76,7 @@ class StringSink : public DB::WriteBufferFromFileBase String & contents; explicit StringSink(String & contents_) - : DB::WriteBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0) + : DB::WriteBufferFromFileBase(Format::BLOCK_SIZE, nullptr, 0) , contents(contents_) {} @@ -102,7 +102,7 @@ class OverwritingStringSink : public DB::WriteBufferFromFileBase String & contents; explicit OverwritingStringSink(String & contents_) - : DB::WriteBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0) + : DB::WriteBufferFromFileBase(Format::BLOCK_SIZE, nullptr, 0) , contents(contents_) , last_sync_pos(0) {}