From be5bacb01758bddcd6b5f9b86c1e9b83ca59c0f6 Mon Sep 17 00:00:00 2001 From: Wenxuan Date: Mon, 30 Jan 2023 13:45:53 +0800 Subject: [PATCH] Add back ingest file range check (#6538) ref pingcap/tiflash#6507 --- .../DeltaMerge/DeltaMergeStore_Ingest.cpp | 23 ++++---- dbms/src/Storages/DeltaMerge/RowKeyRange.h | 21 ++++++- .../SSTFilesToDTFilesOutputStream.cpp | 2 +- .../DeltaMerge/tests/gtest_dm_ingest.cpp | 58 +++++++++---------- .../DeltaMerge/tests/gtest_key_range.cpp | 50 ++++++++++++++++ 5 files changed, 111 insertions(+), 43 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp index b68a7dab299..98b072f5b83 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp @@ -538,18 +538,17 @@ void DeltaMergeStore::ingestFiles( } // Check whether all external files are contained by the range. - // Currently this check is disabled, see https://github.com/pingcap/tiflash/pull/6519 - // for (const auto & ext_file : external_files) - // { - // RUNTIME_CHECK( - // compare(range.getStart(), ext_file.range.getStart()) <= 0, - // range.toDebugString(), - // ext_file.range.toDebugString()); - // RUNTIME_CHECK( - // compare(range.getEnd(), ext_file.range.getEnd()) >= 0, - // range.toDebugString(), - // ext_file.range.toDebugString()); - // } + for (const auto & ext_file : external_files) + { + RUNTIME_CHECK( + compare(range.getStart(), ext_file.range.getStart()) <= 0, + range.toDebugString(), + ext_file.range.toDebugString()); + RUNTIME_CHECK( + compare(range.getEnd(), ext_file.range.getEnd()) >= 0, + range.toDebugString(), + ext_file.range.toDebugString()); + } } EventRecorder write_block_recorder(ProfileEvents::DMWriteFile, ProfileEvents::DMWriteFileNS); diff --git a/dbms/src/Storages/DeltaMerge/RowKeyRange.h b/dbms/src/Storages/DeltaMerge/RowKeyRange.h index 5b6d2db0210..acbcda2ec73 100644 --- a/dbms/src/Storages/DeltaMerge/RowKeyRange.h +++ b/dbms/src/Storages/DeltaMerge/RowKeyRange.h @@ -135,7 +135,11 @@ struct RowKeyValue return is_common_handle == v.is_common_handle && (*value) == (*v.value) && int_value == v.int_value; } - RowKeyValue toPrefixNext() + /** + * Returns the key so that the range [this, this.toPrefixNext()) contains + * all keys with the prefix `this`. + */ + RowKeyValue toPrefixNext() const { std::vector keys(value->begin(), value->end()); int index = keys.size() - 1; @@ -164,6 +168,21 @@ struct RowKeyValue return RowKeyValue(is_common_handle, prefix_value, prefix_int_value); } + /** + * Returns the smallest row key which is larger than the current row key. + */ + RowKeyValue toNext() const + { + HandleValuePtr next_value = std::make_shared(value->begin(), value->end()); + next_value->push_back(0x0); + + Int64 next_int_value = int_value; + if (!is_common_handle && next_int_value != std::numeric_limits::max()) + next_int_value++; + + return RowKeyValue(is_common_handle, next_value, next_int_value); + } + void serialize(WriteBuffer & buf) const { writeBoolText(is_common_handle, buf); diff --git a/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp b/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp index 40134e2f370..bb4ccfdf049 100644 --- a/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp @@ -324,7 +324,7 @@ void SSTFilesToDTFilesOutputStream::updateRangeFromNonEmptyBlock(Bl auto const block_start = rowkey_column.getRowKeyValue(0); auto const block_end = rowkey_column.getRowKeyValue(pk_col.column->size() - 1) // .toRowKeyValue() - .toPrefixNext(); // because range is right-open. + .toNext(); // because range is right-open. // Note: The underlying stream ensures that one row key will not fall into two blocks (when there are multiple versions). // So we will never have overlapped range. diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_ingest.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_ingest.cpp index c6a426fc79b..2c04541be4e 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_ingest.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_ingest.cpp @@ -78,35 +78,35 @@ try } CATCH -//TEST_P(StoreIngestTest, RangeSmallerThanData) -//try -//{ -// ASSERT_EQ(0, getRowsN()); -// auto block1 = fillBlock({.range = {0, 100}}); -// ASSERT_THROW({ -// ingestFiles({.range = {20, 40}, .blocks = {block1}, .clear = false}); -// }, -// DB::Exception); -//} -//CATCH -// -//TEST_P(StoreIngestTest, RangeLargerThanData) -//try -//{ -// ASSERT_EQ(0, getRowsN()); -// auto block1 = fillBlock({.range = {0, 100}}); -// ingestFiles({.range = {-100, 110}, .blocks = {block1}, .clear = false}); -// ASSERT_TRUE(isFilled(0, 100)); -// ASSERT_EQ(100, getRowsN()); -// -// fill(-500, 500); -// ingestFiles({.range = {-100, 110}, .blocks = {block1}, .clear = true}); -// ASSERT_TRUE(isFilled(-500, -100)); -// ASSERT_TRUE(isFilled(0, 100)); -// ASSERT_TRUE(isFilled(110, 500)); -// ASSERT_EQ(890, getRowsN()); -//} -//CATCH +TEST_P(StoreIngestTest, RangeSmallerThanData) +try +{ + ASSERT_EQ(0, getRowsN()); + auto block1 = fillBlock({.range = {0, 100}}); + ASSERT_THROW({ + ingestFiles({.range = {20, 40}, .blocks = {block1}, .clear = false}); + }, + DB::Exception); +} +CATCH + +TEST_P(StoreIngestTest, RangeLargerThanData) +try +{ + ASSERT_EQ(0, getRowsN()); + auto block1 = fillBlock({.range = {0, 100}}); + ingestFiles({.range = {-100, 110}, .blocks = {block1}, .clear = false}); + ASSERT_TRUE(isFilled(0, 100)); + ASSERT_EQ(100, getRowsN()); + + fill(-500, 500); + ingestFiles({.range = {-100, 110}, .blocks = {block1}, .clear = true}); + ASSERT_TRUE(isFilled(-500, -100)); + ASSERT_TRUE(isFilled(0, 100)); + ASSERT_TRUE(isFilled(110, 500)); + ASSERT_EQ(890, getRowsN()); +} +CATCH TEST_P(StoreIngestTest, OverlappedFiles) try diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_key_range.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_key_range.cpp index 802c047a1d7..40a0cf782e6 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_key_range.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_key_range.cpp @@ -98,6 +98,56 @@ TEST(RowKeyRange_test, RedactRangeFromCommonHandle) Redact::setRedactLog(false); // restore flags } +TEST(RowKey, ToNextKeyIntHandle) +{ + const auto key = RowKeyValue::fromHandle(20); + const auto next = key.toNext(); + EXPECT_EQ("21", next.toDebugString()); + + { + const auto expected_next_int = RowKeyValue::fromHandle(21); + EXPECT_EQ(0, compare(next.toRowKeyValueRef(), expected_next_int.toRowKeyValueRef())); + } + { + const auto range_keys = std::make_shared( + RecordKVFormat::genKey(1, 0), + RecordKVFormat::genKey(1, 21)); + const auto range = RowKeyRange::fromRegionRange( + range_keys, + /* table_id */ 1, + /* is_common_handle */ false, + /* row_key_column_size */ 1); + EXPECT_EQ(0, compare(next.toRowKeyValueRef(), range.getEnd())); + } + // Note: The following does not work, because {20,00} will be regarded as Key=20 in RowKeyRange::fromRegionRange. + // { + // auto key_end = RecordKVFormat::genRawKey(1, 20); + // key_end.push_back(0); + // auto tikv_key_end = RecordKVFormat::encodeAsTiKVKey(key_end); + // const auto range_keys = std::make_shared( + // RecordKVFormat::genKey(1, 0), + // std::move(tikv_key_end)); + // const auto range = RowKeyRange::fromRegionRange( + // range_keys, + // /* table_id */ 1, + // /* is_common_handle */ false, + // /* row_key_column_size */ 1); + // EXPECT_EQ(0, compare(next.toRowKeyValueRef(), range.getEnd())); + // } +} + +TEST(RowKey, ToNextKeyCommonHandle) +{ + using namespace std::literals::string_literals; + + const auto key = RowKeyValue(/* is_common_handle */ true, std::make_shared("\xcc\xab"s), 0); + const auto next = key.toNext(); + EXPECT_EQ("CCAB00", next.toDebugString()); + + const auto my_next = RowKeyValue(/* is_common_handle */ true, std::make_shared("\xcc\xab\x00"s), 0); + EXPECT_EQ(0, compare(my_next.toRowKeyValueRef(), next.toRowKeyValueRef())); +} + } // namespace tests } // namespace DM } // namespace DB