Skip to content

Commit

Permalink
do some more rename
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu committed Feb 7, 2022
1 parent aecd310 commit 13ac1e9
Show file tree
Hide file tree
Showing 14 changed files with 101 additions and 96 deletions.
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ struct Settings
M(SettingUInt64, dt_segment_stop_write_delta_size, 2147483648, "Delta size before stop new writes. 2 GB by default.") \
M(SettingUInt64, dt_segment_delta_cache_limit_rows, 4096, "Max rows of cache in segment delta in DeltaTree Engine.") \
M(SettingUInt64, dt_segment_delta_cache_limit_size, 4194304, "Max size of cache in segment delta in DeltaTree Engine. 4 MB by default.") \
M(SettingUInt64, dt_segment_delta_small_pack_rows, 2048, "Determine whether a pack in delta is small or not.") \
M(SettingUInt64, dt_segment_delta_small_pack_size, 8388608, "Determine whether a pack in delta is small or not. 8MB by default.") \
M(SettingUInt64, dt_segment_delta_small_column_file_rows, 2048, "Determine whether a column file in delta is small or not.") \
M(SettingUInt64, dt_segment_delta_small_column_file_size, 8388608, "Determine whether a column file in delta is small or not. 8MB by default.") \
M(SettingUInt64, dt_segment_stable_pack_rows, DEFAULT_MERGE_BLOCK_SIZE, "Expected stable pack rows in DeltaTree Engine.") \
M(SettingFloat, dt_segment_wait_duration_factor, 1, "The factor of wait duration in a write stall.") \
M(SettingUInt64, dt_bg_gc_check_interval, 600, "Background gc thread check interval, the unit is second.") \
Expand Down
90 changes: 47 additions & 43 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ std::pair<size_t, size_t> findColumnFile(const ColumnFiles & column_files, size_
if (deletes_count == deletes_offset)
{
if (unlikely(rows_count != rows_offset))
throw Exception("rows_count and rows_offset are expected to be equal. pack_index: " + DB::toString(column_file_index)
+ ", pack_size: " + DB::toString(column_files.size()) + ", rows_count: " + DB::toString(rows_count)
throw Exception("rows_count and rows_offset are expected to be equal. column_file_index: " + DB::toString(column_file_index)
+ ", column_file_size: " + DB::toString(column_files.size()) + ", rows_count: " + DB::toString(rows_count)
+ ", rows_offset: " + DB::toString(rows_offset) + ", deletes_count: " + DB::toString(deletes_count)
+ ", deletes_offset: " + DB::toString(deletes_offset));
return {column_file_index, 0};
Expand All @@ -39,8 +39,8 @@ std::pair<size_t, size_t> findColumnFile(const ColumnFiles & column_files, size_
if (rows_count > rows_offset)
{
if (unlikely(deletes_count != deletes_offset))
throw Exception("deletes_count and deletes_offset are expected to be equal. pack_index: " + DB::toString(column_file_index)
+ ", pack_size: " + DB::toString(column_files.size()) + ", rows_count: " + DB::toString(rows_count)
throw Exception("deletes_count and deletes_offset are expected to be equal. column_file_index: " + DB::toString(column_file_index)
+ ", column_file_size: " + DB::toString(column_files.size()) + ", rows_count: " + DB::toString(rows_count)
+ ", rows_offset: " + DB::toString(rows_offset) + ", deletes_count: " + DB::toString(deletes_count)
+ ", deletes_offset: " + DB::toString(deletes_offset),
ErrorCodes::LOGICAL_ERROR);
Expand All @@ -50,7 +50,7 @@ std::pair<size_t, size_t> findColumnFile(const ColumnFiles & column_files, size_
}
}
if (rows_count != rows_offset || deletes_count != deletes_offset)
throw Exception("illegal rows_offset and deletes_offset. pack_size: " + DB::toString(column_files.size())
throw Exception("illegal rows_offset and deletes_offset. column_file_size: " + DB::toString(column_files.size())
+ ", rows_count: " + DB::toString(rows_count) + ", rows_offset: " + DB::toString(rows_offset)
+ ", deletes_count: " + DB::toString(deletes_count) + ", deletes_offset: " + DB::toString(deletes_offset),
ErrorCodes::LOGICAL_ERROR);
Expand Down Expand Up @@ -127,75 +127,79 @@ size_t ColumnFileSetReader::readRows(MutableColumns & output_columns, size_t off
if (end == start)
return 0;

auto [start_pack_index, rows_start_in_start_pack] = locatePosByAccumulation(column_file_rows_end, start);
auto [end_pack_index, rows_end_in_end_pack] = locatePosByAccumulation(column_file_rows_end, end);
auto [start_file_index, rows_start_in_start_file] = locatePosByAccumulation(column_file_rows_end, start);
auto [end_file_index, rows_end_in_end_file] = locatePosByAccumulation(column_file_rows_end, end);

size_t actual_read = 0;
for (size_t pack_index = start_pack_index; pack_index <= end_pack_index; ++pack_index)
for (size_t file_index = start_file_index; file_index <= end_file_index; ++file_index)
{
size_t rows_start_in_pack = pack_index == start_pack_index ? rows_start_in_start_pack : 0;
size_t rows_end_in_pack = pack_index == end_pack_index ? rows_end_in_end_pack : column_file_rows[pack_index];
size_t rows_in_pack_limit = rows_end_in_pack - rows_start_in_pack;
size_t rows_start_in_file = file_index == start_file_index ? rows_start_in_start_file : 0;
size_t rows_end_in_file = file_index == end_file_index ? rows_end_in_end_file : column_file_rows[file_index];
size_t rows_in_file_limit = rows_end_in_file - rows_start_in_file;

// Nothing to read.
if (rows_start_in_pack == rows_end_in_pack)
if (rows_in_file_limit == 0)
continue;

auto & column_file_reader = column_file_readers[pack_index];
actual_read += column_file_reader->readRows(output_columns, rows_start_in_pack, rows_in_pack_limit, range);
auto & column_file_reader = column_file_readers[file_index];
actual_read += column_file_reader->readRows(output_columns, rows_start_in_file, rows_in_file_limit, range);
}
return actual_read;
}

void ColumnFileSetReader::getPlaceItems(BlockOrDeletes & place_items, size_t rows_begin, size_t deletes_begin, size_t rows_end, size_t deletes_end, size_t place_rows_offset)
{
/// Note that we merge the consecutive DeltaPackBlock together, which are seperated in groups by DeltaPackDelete and DeltePackFile.
/// Note that we merge the consecutive ColumnFileInMemory or ColumnFileTiny together, which are seperated in groups by ColumnFileDeleteRange and ColumnFileBig.
auto & column_files = snapshot->getColumnFiles();

auto [start_pack_index, rows_start_in_start_pack] = findColumnFile(column_files, rows_begin, deletes_begin);
auto [end_pack_index, rows_end_in_end_pack] = findColumnFile(column_files, rows_end, deletes_end);
auto [start_file_index, rows_start_in_start_file] = findColumnFile(column_files, rows_begin, deletes_begin);
auto [end_file_index, rows_end_in_end_file] = findColumnFile(column_files, rows_end, deletes_end);

size_t block_rows_start = rows_begin;
size_t block_rows_end = rows_begin;

for (size_t pack_index = start_pack_index; pack_index < column_files.size() && pack_index <= end_pack_index; ++pack_index)
for (size_t file_index = start_file_index; file_index < column_files.size() && file_index <= end_file_index; ++file_index)
{
auto & pack = *column_files[pack_index];
auto & column_file = *column_files[file_index];

if (pack.isDeleteRange() || pack.isBigFile())
if (column_file.isDeleteRange() || column_file.isBigFile())
{
// First, compact the DeltaPackBlocks before this pack into one block.
// First, compact the ColumnFileInMemory or ColumnFileTiny before this column file into one block.
if (block_rows_end != block_rows_start)
{
auto block = readPKVersion(block_rows_start, block_rows_end - block_rows_start);
place_items.emplace_back(std::move(block), block_rows_start + place_rows_offset);
}

// Second, take current pack.
if (auto * pack_delete = pack.tryToDeleteRange(); pack_delete)
// Second, take current column file.
if (auto * dr = column_file.tryToDeleteRange(); dr)
{
place_items.emplace_back(pack_delete->getDeleteRange());
place_items.emplace_back(dr->getDeleteRange());
}
else if (pack.isBigFile() && pack.getRows())
else if (column_file.isBigFile() && column_file.getRows())
{
auto block = readPKVersion(block_rows_end, pack.getRows());
auto block = readPKVersion(block_rows_end, column_file.getRows());
place_items.emplace_back(std::move(block), block_rows_end + place_rows_offset);
}
else
{
throw Exception("Unknown column file type", ErrorCodes::LOGICAL_ERROR);
}

block_rows_end += pack.getRows();
block_rows_end += column_file.getRows();
block_rows_start = block_rows_end;
}
else
{
// It is a DeltaPackBlock.
size_t rows_start_in_pack = pack_index == start_pack_index ? rows_start_in_start_pack : 0;
size_t rows_end_in_pack = pack_index == end_pack_index ? rows_end_in_end_pack : pack.getRows();
// It is a ColumnFileInMemory or ColumnFileTiny.
size_t rows_start_in_file = file_index == start_file_index ? rows_start_in_start_file : 0;
size_t rows_end_in_file = file_index == end_file_index ? rows_end_in_end_file : column_file.getRows();

block_rows_end += rows_end_in_pack - rows_start_in_pack;
block_rows_end += rows_end_in_file - rows_start_in_file;

if (pack_index == column_files.size() - 1 || pack_index == end_pack_index)
if (file_index == column_files.size() - 1 || file_index == end_file_index)
{
// It is the last pack.
// It is the last column file.
if (block_rows_end != block_rows_start)
{
auto block = readPKVersion(block_rows_start, block_rows_end - block_rows_start);
Expand All @@ -213,22 +217,22 @@ bool ColumnFileSetReader::shouldPlace(const DMContext & context,
size_t placed_rows)
{
auto & column_files = snapshot->getColumnFiles();
auto [start_pack_index, rows_start_in_start_pack] = locatePosByAccumulation(column_file_rows_end, placed_rows);
auto [start_file_index, rows_start_in_start_file] = locatePosByAccumulation(column_file_rows_end, placed_rows);

for (size_t pack_index = start_pack_index; pack_index < snapshot->getColumnFileCount(); ++pack_index)
for (size_t file_index = start_file_index; file_index < snapshot->getColumnFileCount(); ++file_index)
{
auto & column_file = column_files[pack_index];
auto & column_file = column_files[file_index];

// Always do place index if DeltaPackFile exists.
// Always do place index if ColumnFileBig exists.
if (column_file->isBigFile())
return true;
if (unlikely(column_file->isDeleteRange()))
throw Exception("pack is delete range", ErrorCodes::LOGICAL_ERROR);
throw Exception("column file is delete range", ErrorCodes::LOGICAL_ERROR);

size_t rows_start_in_pack = pack_index == start_pack_index ? rows_start_in_start_pack : 0;
size_t rows_end_in_pack = column_file_rows[pack_index];
size_t rows_start_in_file = file_index == start_file_index ? rows_start_in_start_file : 0;
size_t rows_end_in_file = column_file_rows[file_index];

auto & column_file_reader = column_file_readers[pack_index];
auto & column_file_reader = column_file_readers[file_index];
if (column_file->isInMemoryFile())
{
auto & dpb_reader = typeid_cast<ColumnFileInMemoryReader &>(*column_file_reader);
Expand All @@ -238,7 +242,7 @@ bool ColumnFileSetReader::shouldPlace(const DMContext & context,
auto rkcc = RowKeyColumnContainer(pk_column, context.is_common_handle);
const auto & version_col_data = toColumnVectorData<UInt64>(version_column);

for (auto i = rows_start_in_pack; i < rows_end_in_pack; ++i)
for (auto i = rows_start_in_file; i < rows_end_in_file; ++i)
{
if (version_col_data[i] <= max_version && relevant_range.check(rkcc.getRowKeyValue(i)))
return true;
Expand All @@ -253,7 +257,7 @@ bool ColumnFileSetReader::shouldPlace(const DMContext & context,
auto rkcc = RowKeyColumnContainer(pk_column, context.is_common_handle);
const auto & version_col_data = toColumnVectorData<UInt64>(version_column);

for (auto i = rows_start_in_pack; i < rows_end_in_pack; ++i)
for (auto i = rows_start_in_file; i < rows_end_in_file; ++i)
{
if (version_col_data[i] <= max_version && relevant_range.check(rkcc.getRowKeyValue(i)))
return true;
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class ColumnFileSetInputStream : public IBlockInputStream
size_t column_files_count;

ColumnFileReaderPtr cur_column_file_reader = {};
size_t next_pack_index = 0;
size_t next_file_index = 0;

public:
ColumnFileSetInputStream(const DMContext & context_,
Expand All @@ -76,19 +76,19 @@ class ColumnFileSetInputStream : public IBlockInputStream

Block read() override
{
while (cur_column_file_reader || next_pack_index < column_files_count)
while (cur_column_file_reader || next_file_index < column_files_count)
{
if (!cur_column_file_reader)
{
if (column_files[next_pack_index]->isDeleteRange())
if (column_files[next_file_index]->isDeleteRange())
{
++next_pack_index;
++next_file_index;
continue;
}
else
{
cur_column_file_reader = reader.column_file_readers[next_pack_index];
++next_pack_index;
cur_column_file_reader = reader.column_file_readers[next_file_index];
++next_file_index;
}
}
Block block = cur_column_file_reader->readNextBlock();
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Storages/DeltaMerge/DMContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ struct DMContext : private boost::noncopyable
const size_t delta_cache_limit_rows;
// The size threshold of cache in delta.
const size_t delta_cache_limit_bytes;
// Determine whether a pack is small or not in rows.
const size_t delta_small_pack_rows;
// Determine whether a pack is small or not in bytes.
const size_t delta_small_pack_bytes;
// Determine whether a column file is small or not in rows.
const size_t delta_small_column_file_rows;
// Determine whether a column file is small or not in bytes.
const size_t delta_small_column_file_bytes;
// The expected stable pack rows.
const size_t stable_pack_rows;

Expand Down Expand Up @@ -95,8 +95,8 @@ struct DMContext : private boost::noncopyable
, delta_limit_bytes(settings.dt_segment_delta_limit_size)
, delta_cache_limit_rows(settings.dt_segment_delta_cache_limit_rows)
, delta_cache_limit_bytes(settings.dt_segment_delta_cache_limit_size)
, delta_small_pack_rows(settings.dt_segment_delta_small_pack_rows)
, delta_small_pack_bytes(settings.dt_segment_delta_small_pack_size)
, delta_small_column_file_rows(settings.dt_segment_delta_small_column_file_rows)
, delta_small_column_file_bytes(settings.dt_segment_delta_small_column_file_size)
, stable_pack_rows(settings.dt_segment_stable_pack_rows)
, enable_logical_split(settings.dt_enable_logical_split)
, read_delta_only(settings.dt_read_delta_only)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Delta/ColumnFileFlushTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ bool ColumnFileFlushTask::commit(ColumnFilePersistedSetPtr & persisted_file_set,
{
// Just keep cache for really small column file
ColumnFile::CachePtr column_file_cache = nullptr;
if (m_file->getRows() < context.delta_small_pack_rows || m_file->getBytes() < context.delta_small_pack_bytes)
if (m_file->getRows() < context.delta_small_column_file_rows || m_file->getBytes() < context.delta_small_column_file_bytes)
{
column_file_cache = !task.sorted ? m_file->getCache() : std::make_shared<ColumnFile::Cache>(std::move(task.block_data));
}
Expand Down
Loading

0 comments on commit 13ac1e9

Please sign in to comment.