Skip to content

Commit

Permalink
Add back ingest file range check (#6538)
Browse files Browse the repository at this point in the history
ref #6507
  • Loading branch information
breezewish authored Jan 30, 2023
1 parent 0b1ffce commit be5bacb
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 43 deletions.
23 changes: 11 additions & 12 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -538,18 +538,17 @@ void DeltaMergeStore::ingestFiles(
}

// Check whether all external files are contained by the range.
// Currently this check is disabled, see https://github.com/pingcap/tiflash/pull/6519
// for (const auto & ext_file : external_files)
// {
// RUNTIME_CHECK(
// compare(range.getStart(), ext_file.range.getStart()) <= 0,
// range.toDebugString(),
// ext_file.range.toDebugString());
// RUNTIME_CHECK(
// compare(range.getEnd(), ext_file.range.getEnd()) >= 0,
// range.toDebugString(),
// ext_file.range.toDebugString());
// }
for (const auto & ext_file : external_files)
{
RUNTIME_CHECK(
compare(range.getStart(), ext_file.range.getStart()) <= 0,
range.toDebugString(),
ext_file.range.toDebugString());
RUNTIME_CHECK(
compare(range.getEnd(), ext_file.range.getEnd()) >= 0,
range.toDebugString(),
ext_file.range.toDebugString());
}
}

EventRecorder write_block_recorder(ProfileEvents::DMWriteFile, ProfileEvents::DMWriteFileNS);
Expand Down
21 changes: 20 additions & 1 deletion dbms/src/Storages/DeltaMerge/RowKeyRange.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,11 @@ struct RowKeyValue
return is_common_handle == v.is_common_handle && (*value) == (*v.value) && int_value == v.int_value;
}

RowKeyValue toPrefixNext()
/**
* Returns the key so that the range [this, this.toPrefixNext()) contains
* all keys with the prefix `this`.
*/
RowKeyValue toPrefixNext() const
{
std::vector<UInt8> keys(value->begin(), value->end());
int index = keys.size() - 1;
Expand Down Expand Up @@ -164,6 +168,21 @@ struct RowKeyValue
return RowKeyValue(is_common_handle, prefix_value, prefix_int_value);
}

/**
* Returns the smallest row key which is larger than the current row key.
*/
RowKeyValue toNext() const
{
HandleValuePtr next_value = std::make_shared<String>(value->begin(), value->end());
next_value->push_back(0x0);

Int64 next_int_value = int_value;
if (!is_common_handle && next_int_value != std::numeric_limits<Int64>::max())
next_int_value++;

return RowKeyValue(is_common_handle, next_value, next_int_value);
}

void serialize(WriteBuffer & buf) const
{
writeBoolText(is_common_handle, buf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ void SSTFilesToDTFilesOutputStream<ChildStream>::updateRangeFromNonEmptyBlock(Bl
auto const block_start = rowkey_column.getRowKeyValue(0);
auto const block_end = rowkey_column.getRowKeyValue(pk_col.column->size() - 1) //
.toRowKeyValue()
.toPrefixNext(); // because range is right-open.
.toNext(); // because range is right-open.

// Note: The underlying stream ensures that one row key will not fall into two blocks (when there are multiple versions).
// So we will never have overlapped range.
Expand Down
58 changes: 29 additions & 29 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,35 +78,35 @@ try
}
CATCH

//TEST_P(StoreIngestTest, RangeSmallerThanData)
//try
//{
// ASSERT_EQ(0, getRowsN());
// auto block1 = fillBlock({.range = {0, 100}});
// ASSERT_THROW({
// ingestFiles({.range = {20, 40}, .blocks = {block1}, .clear = false});
// },
// DB::Exception);
//}
//CATCH
//
//TEST_P(StoreIngestTest, RangeLargerThanData)
//try
//{
// ASSERT_EQ(0, getRowsN());
// auto block1 = fillBlock({.range = {0, 100}});
// ingestFiles({.range = {-100, 110}, .blocks = {block1}, .clear = false});
// ASSERT_TRUE(isFilled(0, 100));
// ASSERT_EQ(100, getRowsN());
//
// fill(-500, 500);
// ingestFiles({.range = {-100, 110}, .blocks = {block1}, .clear = true});
// ASSERT_TRUE(isFilled(-500, -100));
// ASSERT_TRUE(isFilled(0, 100));
// ASSERT_TRUE(isFilled(110, 500));
// ASSERT_EQ(890, getRowsN());
//}
//CATCH
TEST_P(StoreIngestTest, RangeSmallerThanData)
try
{
ASSERT_EQ(0, getRowsN());
auto block1 = fillBlock({.range = {0, 100}});
ASSERT_THROW({
ingestFiles({.range = {20, 40}, .blocks = {block1}, .clear = false});
},
DB::Exception);
}
CATCH

TEST_P(StoreIngestTest, RangeLargerThanData)
try
{
ASSERT_EQ(0, getRowsN());
auto block1 = fillBlock({.range = {0, 100}});
ingestFiles({.range = {-100, 110}, .blocks = {block1}, .clear = false});
ASSERT_TRUE(isFilled(0, 100));
ASSERT_EQ(100, getRowsN());

fill(-500, 500);
ingestFiles({.range = {-100, 110}, .blocks = {block1}, .clear = true});
ASSERT_TRUE(isFilled(-500, -100));
ASSERT_TRUE(isFilled(0, 100));
ASSERT_TRUE(isFilled(110, 500));
ASSERT_EQ(890, getRowsN());
}
CATCH

TEST_P(StoreIngestTest, OverlappedFiles)
try
Expand Down
50 changes: 50 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_key_range.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,56 @@ TEST(RowKeyRange_test, RedactRangeFromCommonHandle)
Redact::setRedactLog(false); // restore flags
}

TEST(RowKey, ToNextKeyIntHandle)
{
const auto key = RowKeyValue::fromHandle(20);
const auto next = key.toNext();
EXPECT_EQ("21", next.toDebugString());

{
const auto expected_next_int = RowKeyValue::fromHandle(21);
EXPECT_EQ(0, compare(next.toRowKeyValueRef(), expected_next_int.toRowKeyValueRef()));
}
{
const auto range_keys = std::make_shared<RegionRangeKeys>(
RecordKVFormat::genKey(1, 0),
RecordKVFormat::genKey(1, 21));
const auto range = RowKeyRange::fromRegionRange(
range_keys,
/* table_id */ 1,
/* is_common_handle */ false,
/* row_key_column_size */ 1);
EXPECT_EQ(0, compare(next.toRowKeyValueRef(), range.getEnd()));
}
// Note: The following does not work, because {20,00} will be regarded as Key=20 in RowKeyRange::fromRegionRange.
// {
// auto key_end = RecordKVFormat::genRawKey(1, 20);
// key_end.push_back(0);
// auto tikv_key_end = RecordKVFormat::encodeAsTiKVKey(key_end);
// const auto range_keys = std::make_shared<RegionRangeKeys>(
// RecordKVFormat::genKey(1, 0),
// std::move(tikv_key_end));
// const auto range = RowKeyRange::fromRegionRange(
// range_keys,
// /* table_id */ 1,
// /* is_common_handle */ false,
// /* row_key_column_size */ 1);
// EXPECT_EQ(0, compare(next.toRowKeyValueRef(), range.getEnd()));
// }
}

TEST(RowKey, ToNextKeyCommonHandle)
{
using namespace std::literals::string_literals;

const auto key = RowKeyValue(/* is_common_handle */ true, std::make_shared<String>("\xcc\xab"s), 0);
const auto next = key.toNext();
EXPECT_EQ("CCAB00", next.toDebugString());

const auto my_next = RowKeyValue(/* is_common_handle */ true, std::make_shared<String>("\xcc\xab\x00"s), 0);
EXPECT_EQ(0, compare(my_next.toRowKeyValueRef(), next.toRowKeyValueRef()));
}

} // namespace tests
} // namespace DM
} // namespace DB

0 comments on commit be5bacb

Please sign in to comment.