From 85c502a9b34ea1042281b8cf431c234ed8a35049 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Fri, 11 Feb 2022 18:32:32 +0800 Subject: [PATCH] seri REF Signed-off-by: JaySon-Huang --- dbms/src/Storages/Page/V3/WAL/serialize.cpp | 79 +++++++++++++++- .../Page/V3/tests/gtest_wal_store.cpp | 91 +++++++++++++++++++ 2 files changed, 168 insertions(+), 2 deletions(-) diff --git a/dbms/src/Storages/Page/V3/WAL/serialize.cpp b/dbms/src/Storages/Page/V3/WAL/serialize.cpp index 252f92ec9e3..f1aba060f36 100644 --- a/dbms/src/Storages/Page/V3/WAL/serialize.cpp +++ b/dbms/src/Storages/Page/V3/WAL/serialize.cpp @@ -1,4 +1,5 @@ #include +#include #include #include @@ -80,6 +81,74 @@ void deserializePutFrom([[maybe_unused]] const WriteBatch::WriteType record_type edit.appendRecord(rec); } +void serializeRefTo(const PageEntriesEdit::EditRecord & record, WriteBuffer & buf) +{ + assert(record.type == WriteBatch::WriteType::REF); + + writeIntBinary(record.type, buf); + + UInt32 flags = 0; + writeIntBinary(flags, buf); + writeIntBinary(record.page_id, buf); + writeIntBinary(record.ori_page_id, buf); + serializeVersionTo(record.version, buf); + writeIntBinary(record.entry.file_id, buf); + writeIntBinary(record.entry.offset, buf); + writeIntBinary(record.entry.size, buf); + writeIntBinary(record.entry.checksum, buf); + // fieldsOffset TODO: compression on `fieldsOffset` + writeIntBinary(record.entry.field_offsets.size(), buf); + for (const auto & [off, checksum] : record.entry.field_offsets) + { + writeIntBinary(off, buf); + writeIntBinary(checksum, buf); + } +} + +void deserializeRefFrom([[maybe_unused]] const WriteBatch::WriteType record_type, ReadBuffer & buf, PageEntriesEdit & edit) +{ + assert(record_type == WriteBatch::WriteType::REF); + + UInt32 flags = 0; + readIntBinary(flags, buf); + PageId page_id, ori_page_id; + readIntBinary(page_id, buf); + readIntBinary(ori_page_id, buf); + PageVersionType version; + deserializeVersionFrom(buf, version); + PageEntryV3 entry; + readIntBinary(entry.file_id, buf); + readIntBinary(entry.offset, buf); + readIntBinary(entry.size, buf); + readIntBinary(entry.checksum, buf); + // fieldsOffset + PageFieldOffsetChecksums field_offsets; + UInt64 size_field_offsets = 0; + readIntBinary(size_field_offsets, buf); + if (size_field_offsets != 0) + { + entry.field_offsets.reserve(size_field_offsets); + PageFieldOffset field_offset; + UInt64 field_checksum; + for (size_t i = 0; i < size_field_offsets; ++i) + { + readIntBinary(field_offset, buf); + readIntBinary(field_checksum, buf); + entry.field_offsets.emplace_back(field_offset, field_checksum); + } + } + + // All consider as put + PageEntriesEdit::EditRecord rec; + rec.type = WriteBatch::WriteType::PUT; + rec.page_id = page_id; + rec.ori_page_id = ori_page_id; + rec.version = version; + rec.entry = entry; + edit.appendRecord(rec); +} + + void serializePutExternalTo(const PageEntriesEdit::EditRecord & record, WriteBuffer & buf) { assert(record.type == WriteBatch::WriteType::PUT_EXTERNAL); @@ -142,11 +211,15 @@ void deserializeFrom(ReadBuffer & buf, PageEntriesEdit & edit) { case WriteBatch::WriteType::PUT: case WriteBatch::WriteType::UPSERT: - case WriteBatch::WriteType::REF: { deserializePutFrom(record_type, buf, edit); break; } + case WriteBatch::WriteType::REF: + { + deserializeRefFrom(record_type, buf, edit); + break; + } case WriteBatch::WriteType::DEL: { deserializeDelFrom(record_type, buf, edit); @@ -174,9 +247,11 @@ String serializeTo(const PageEntriesEdit & edit) { case DB::WriteBatch::WriteType::PUT: case DB::WriteBatch::WriteType::UPSERT: - case DB::WriteBatch::WriteType::REF: serializePutTo(record, buf); break; + case DB::WriteBatch::WriteType::REF: + serializeRefTo(record, buf); + break; case DB::WriteBatch::WriteType::DEL: serializeDelTo(record, buf); break; diff --git a/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp b/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp index 1832009c69e..5febdafa5d2 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -30,6 +31,7 @@ TEST(WALSeriTest, AllPuts) auto deseri_edit = DB::PS::V3::ser::deserializeFrom(DB::PS::V3::ser::serializeTo(edit)); ASSERT_EQ(deseri_edit.size(), 2); auto iter = edit.getRecords().begin(); + EXPECT_EQ(iter->type, WriteBatch::WriteType::PUT); EXPECT_EQ(iter->page_id, 1); EXPECT_EQ(iter->version, ver20); EXPECT_TRUE(isSameEntry(iter->entry, entry_p1)); @@ -61,14 +63,17 @@ try auto deseri_edit = DB::PS::V3::ser::deserializeFrom(DB::PS::V3::ser::serializeTo(edit)); ASSERT_EQ(deseri_edit.size(), 6); auto iter = edit.getRecords().begin(); + EXPECT_EQ(iter->type, WriteBatch::WriteType::PUT); EXPECT_EQ(iter->page_id, 3); EXPECT_EQ(iter->version, ver21); EXPECT_TRUE(isSameEntry(iter->entry, entry_p3)); iter++; + EXPECT_EQ(iter->type, WriteBatch::WriteType::REF); EXPECT_EQ(iter->page_id, 4); EXPECT_EQ(iter->version, ver21); EXPECT_TRUE(isSameEntry(iter->entry, entry_p3)); iter++; + EXPECT_EQ(iter->type, WriteBatch::WriteType::PUT); EXPECT_EQ(iter->page_id, 5); EXPECT_EQ(iter->version, ver21); EXPECT_TRUE(isSameEntry(iter->entry, entry_p5)); @@ -87,6 +92,92 @@ try } CATCH +TEST(WALSeriTest, PutExternalsAndRefsAndDels) +try +{ + { + PageEntriesEdit edit; + edit.putExternal(10); + edit.ref(11, 10); + edit.ref(12, 10); + edit.del(10); + + PageVersionType ver21(/*seq=*/21); + for (auto & rec : edit.getMutRecords()) + rec.version = ver21; + + auto deseri_edit = DB::PS::V3::ser::deserializeFrom(DB::PS::V3::ser::serializeTo(edit)); + ASSERT_EQ(deseri_edit.size(), 4); + auto iter = edit.getRecords().begin(); + EXPECT_EQ(iter->type, WriteBatch::WriteType::PUT_EXTERNAL); + EXPECT_EQ(iter->page_id, 10); + EXPECT_EQ(iter->version, ver21); + iter++; + EXPECT_EQ(iter->type, WriteBatch::WriteType::REF); + EXPECT_EQ(iter->page_id, 11); + EXPECT_EQ(iter->ori_page_id, 10); + EXPECT_EQ(iter->version, ver21); + iter++; + EXPECT_EQ(iter->type, WriteBatch::WriteType::REF); + EXPECT_EQ(iter->page_id, 12); + EXPECT_EQ(iter->ori_page_id, 10); + EXPECT_EQ(iter->version, ver21); + iter++; + EXPECT_EQ(iter->type, WriteBatch::WriteType::DEL); + EXPECT_EQ(iter->page_id, 10); + EXPECT_EQ(iter->version, ver21); + } + + { + PageEntriesEdit edit; + edit.del(11); + PageVersionType ver22(/*seq=*/22); + for (auto & rec : edit.getMutRecords()) + rec.version = ver22; + + auto deseri_edit = DB::PS::V3::ser::deserializeFrom(DB::PS::V3::ser::serializeTo(edit)); + ASSERT_EQ(deseri_edit.size(), 1); + auto iter = edit.getRecords().begin(); + EXPECT_EQ(iter->type, WriteBatch::WriteType::DEL); + EXPECT_EQ(iter->page_id, 11); + EXPECT_EQ(iter->version, ver22); + } + + { + PageEntriesEdit edit; + edit.putExternal(10); + edit.ref(11, 10); + edit.ref(12, 11); + edit.del(10); + + PageVersionType ver23(/*seq=*/23); + for (auto & rec : edit.getMutRecords()) + rec.version = ver23; + + auto deseri_edit = DB::PS::V3::ser::deserializeFrom(DB::PS::V3::ser::serializeTo(edit)); + ASSERT_EQ(deseri_edit.size(), 4); + auto iter = edit.getRecords().begin(); + EXPECT_EQ(iter->type, WriteBatch::WriteType::PUT_EXTERNAL); + EXPECT_EQ(iter->page_id, 10); + EXPECT_EQ(iter->version, ver23); + iter++; + EXPECT_EQ(iter->type, WriteBatch::WriteType::REF); + EXPECT_EQ(iter->page_id, 11); + EXPECT_EQ(iter->ori_page_id, 10); + EXPECT_EQ(iter->version, ver23); + iter++; + EXPECT_EQ(iter->type, WriteBatch::WriteType::REF); + EXPECT_EQ(iter->page_id, 12); + EXPECT_EQ(iter->ori_page_id, 11); + EXPECT_EQ(iter->version, ver23); + iter++; + EXPECT_EQ(iter->type, WriteBatch::WriteType::DEL); + EXPECT_EQ(iter->page_id, 10); + EXPECT_EQ(iter->version, ver23); + } +} +CATCH + TEST(WALSeriTest, Upserts) { PageEntryV3 entry_p1_2{.file_id = 2, .size = 1, .tag = 0, .offset = 0x123, .checksum = 0x4567};