Skip to content

Commit

Permalink
seri REF
Browse files Browse the repository at this point in the history
Signed-off-by: JaySon-Huang <tshent@qq.com>
  • Loading branch information
JaySon-Huang committed Feb 11, 2022
1 parent 731d652 commit 85c502a
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 2 deletions.
79 changes: 77 additions & 2 deletions dbms/src/Storages/Page/V3/WAL/serialize.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Storages/Page/V3/WAL/serialize.h>
#include <Storages/Page/WriteBatch.h>

Expand Down Expand Up @@ -80,6 +81,74 @@ void deserializePutFrom([[maybe_unused]] const WriteBatch::WriteType record_type
edit.appendRecord(rec);
}

void serializeRefTo(const PageEntriesEdit::EditRecord & record, WriteBuffer & buf)
{
assert(record.type == WriteBatch::WriteType::REF);

writeIntBinary(record.type, buf);

UInt32 flags = 0;
writeIntBinary(flags, buf);
writeIntBinary(record.page_id, buf);
writeIntBinary(record.ori_page_id, buf);
serializeVersionTo(record.version, buf);
writeIntBinary(record.entry.file_id, buf);
writeIntBinary(record.entry.offset, buf);
writeIntBinary(record.entry.size, buf);
writeIntBinary(record.entry.checksum, buf);
// fieldsOffset TODO: compression on `fieldsOffset`
writeIntBinary(record.entry.field_offsets.size(), buf);
for (const auto & [off, checksum] : record.entry.field_offsets)
{
writeIntBinary(off, buf);
writeIntBinary(checksum, buf);
}
}

void deserializeRefFrom([[maybe_unused]] const WriteBatch::WriteType record_type, ReadBuffer & buf, PageEntriesEdit & edit)
{
assert(record_type == WriteBatch::WriteType::REF);

UInt32 flags = 0;
readIntBinary(flags, buf);
PageId page_id, ori_page_id;
readIntBinary(page_id, buf);
readIntBinary(ori_page_id, buf);
PageVersionType version;
deserializeVersionFrom(buf, version);
PageEntryV3 entry;
readIntBinary(entry.file_id, buf);
readIntBinary(entry.offset, buf);
readIntBinary(entry.size, buf);
readIntBinary(entry.checksum, buf);
// fieldsOffset
PageFieldOffsetChecksums field_offsets;
UInt64 size_field_offsets = 0;
readIntBinary(size_field_offsets, buf);
if (size_field_offsets != 0)
{
entry.field_offsets.reserve(size_field_offsets);
PageFieldOffset field_offset;
UInt64 field_checksum;
for (size_t i = 0; i < size_field_offsets; ++i)
{
readIntBinary(field_offset, buf);
readIntBinary(field_checksum, buf);
entry.field_offsets.emplace_back(field_offset, field_checksum);
}
}

// All consider as put
PageEntriesEdit::EditRecord rec;
rec.type = WriteBatch::WriteType::PUT;
rec.page_id = page_id;
rec.ori_page_id = ori_page_id;
rec.version = version;
rec.entry = entry;
edit.appendRecord(rec);
}


void serializePutExternalTo(const PageEntriesEdit::EditRecord & record, WriteBuffer & buf)
{
assert(record.type == WriteBatch::WriteType::PUT_EXTERNAL);
Expand Down Expand Up @@ -142,11 +211,15 @@ void deserializeFrom(ReadBuffer & buf, PageEntriesEdit & edit)
{
case WriteBatch::WriteType::PUT:
case WriteBatch::WriteType::UPSERT:
case WriteBatch::WriteType::REF:
{
deserializePutFrom(record_type, buf, edit);
break;
}
case WriteBatch::WriteType::REF:
{
deserializeRefFrom(record_type, buf, edit);
break;
}
case WriteBatch::WriteType::DEL:
{
deserializeDelFrom(record_type, buf, edit);
Expand Down Expand Up @@ -174,9 +247,11 @@ String serializeTo(const PageEntriesEdit & edit)
{
case DB::WriteBatch::WriteType::PUT:
case DB::WriteBatch::WriteType::UPSERT:
case DB::WriteBatch::WriteType::REF:
serializePutTo(record, buf);
break;
case DB::WriteBatch::WriteType::REF:
serializeRefTo(record, buf);
break;
case DB::WriteBatch::WriteType::DEL:
serializeDelTo(record, buf);
break;
Expand Down
91 changes: 91 additions & 0 deletions dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <Storages/Page/V3/WAL/serialize.h>
#include <Storages/Page/V3/WALStore.h>
#include <Storages/Page/V3/tests/entries_helper.h>
#include <Storages/Page/WriteBatch.h>
#include <Storages/tests/TiFlashStorageTestBasic.h>
#include <TestUtils/MockDiskDelegator.h>
#include <TestUtils/TiFlashTestEnv.h>
Expand All @@ -30,6 +31,7 @@ TEST(WALSeriTest, AllPuts)
auto deseri_edit = DB::PS::V3::ser::deserializeFrom(DB::PS::V3::ser::serializeTo(edit));
ASSERT_EQ(deseri_edit.size(), 2);
auto iter = edit.getRecords().begin();
EXPECT_EQ(iter->type, WriteBatch::WriteType::PUT);
EXPECT_EQ(iter->page_id, 1);
EXPECT_EQ(iter->version, ver20);
EXPECT_TRUE(isSameEntry(iter->entry, entry_p1));
Expand Down Expand Up @@ -61,14 +63,17 @@ try
auto deseri_edit = DB::PS::V3::ser::deserializeFrom(DB::PS::V3::ser::serializeTo(edit));
ASSERT_EQ(deseri_edit.size(), 6);
auto iter = edit.getRecords().begin();
EXPECT_EQ(iter->type, WriteBatch::WriteType::PUT);
EXPECT_EQ(iter->page_id, 3);
EXPECT_EQ(iter->version, ver21);
EXPECT_TRUE(isSameEntry(iter->entry, entry_p3));
iter++;
EXPECT_EQ(iter->type, WriteBatch::WriteType::REF);
EXPECT_EQ(iter->page_id, 4);
EXPECT_EQ(iter->version, ver21);
EXPECT_TRUE(isSameEntry(iter->entry, entry_p3));
iter++;
EXPECT_EQ(iter->type, WriteBatch::WriteType::PUT);
EXPECT_EQ(iter->page_id, 5);
EXPECT_EQ(iter->version, ver21);
EXPECT_TRUE(isSameEntry(iter->entry, entry_p5));
Expand All @@ -87,6 +92,92 @@ try
}
CATCH

TEST(WALSeriTest, PutExternalsAndRefsAndDels)
try
{
{
PageEntriesEdit edit;
edit.putExternal(10);
edit.ref(11, 10);
edit.ref(12, 10);
edit.del(10);

PageVersionType ver21(/*seq=*/21);
for (auto & rec : edit.getMutRecords())
rec.version = ver21;

auto deseri_edit = DB::PS::V3::ser::deserializeFrom(DB::PS::V3::ser::serializeTo(edit));
ASSERT_EQ(deseri_edit.size(), 4);
auto iter = edit.getRecords().begin();
EXPECT_EQ(iter->type, WriteBatch::WriteType::PUT_EXTERNAL);
EXPECT_EQ(iter->page_id, 10);
EXPECT_EQ(iter->version, ver21);
iter++;
EXPECT_EQ(iter->type, WriteBatch::WriteType::REF);
EXPECT_EQ(iter->page_id, 11);
EXPECT_EQ(iter->ori_page_id, 10);
EXPECT_EQ(iter->version, ver21);
iter++;
EXPECT_EQ(iter->type, WriteBatch::WriteType::REF);
EXPECT_EQ(iter->page_id, 12);
EXPECT_EQ(iter->ori_page_id, 10);
EXPECT_EQ(iter->version, ver21);
iter++;
EXPECT_EQ(iter->type, WriteBatch::WriteType::DEL);
EXPECT_EQ(iter->page_id, 10);
EXPECT_EQ(iter->version, ver21);
}

{
PageEntriesEdit edit;
edit.del(11);
PageVersionType ver22(/*seq=*/22);
for (auto & rec : edit.getMutRecords())
rec.version = ver22;

auto deseri_edit = DB::PS::V3::ser::deserializeFrom(DB::PS::V3::ser::serializeTo(edit));
ASSERT_EQ(deseri_edit.size(), 1);
auto iter = edit.getRecords().begin();
EXPECT_EQ(iter->type, WriteBatch::WriteType::DEL);
EXPECT_EQ(iter->page_id, 11);
EXPECT_EQ(iter->version, ver22);
}

{
PageEntriesEdit edit;
edit.putExternal(10);
edit.ref(11, 10);
edit.ref(12, 11);
edit.del(10);

PageVersionType ver23(/*seq=*/23);
for (auto & rec : edit.getMutRecords())
rec.version = ver23;

auto deseri_edit = DB::PS::V3::ser::deserializeFrom(DB::PS::V3::ser::serializeTo(edit));
ASSERT_EQ(deseri_edit.size(), 4);
auto iter = edit.getRecords().begin();
EXPECT_EQ(iter->type, WriteBatch::WriteType::PUT_EXTERNAL);
EXPECT_EQ(iter->page_id, 10);
EXPECT_EQ(iter->version, ver23);
iter++;
EXPECT_EQ(iter->type, WriteBatch::WriteType::REF);
EXPECT_EQ(iter->page_id, 11);
EXPECT_EQ(iter->ori_page_id, 10);
EXPECT_EQ(iter->version, ver23);
iter++;
EXPECT_EQ(iter->type, WriteBatch::WriteType::REF);
EXPECT_EQ(iter->page_id, 12);
EXPECT_EQ(iter->ori_page_id, 11);
EXPECT_EQ(iter->version, ver23);
iter++;
EXPECT_EQ(iter->type, WriteBatch::WriteType::DEL);
EXPECT_EQ(iter->page_id, 10);
EXPECT_EQ(iter->version, ver23);
}
}
CATCH

TEST(WALSeriTest, Upserts)
{
PageEntryV3 entry_p1_2{.file_id = 2, .size = 1, .tag = 0, .offset = 0x123, .checksum = 0x4567};
Expand Down

0 comments on commit 85c502a

Please sign in to comment.