Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate storage delta layer to multiple layers #3943

Merged
merged 37 commits into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
54fecd9
add ColumnFileSetReader
lidezhu Jan 25, 2022
dc512e2
add MemTableSet and ColumnFilePersistedSet
lidezhu Jan 25, 2022
9e2d932
seprate delta layer to two layers
lidezhu Jan 26, 2022
39ebd23
do some rename
lidezhu Jan 26, 2022
a84be26
format
lidezhu Jan 26, 2022
25b51ed
refactor flush process
lidezhu Jan 26, 2022
c5dfd47
do some rename
lidezhu Jan 26, 2022
a38327e
fix getLastSchema
lidezhu Jan 26, 2022
67e11a5
fix minor compaction
lidezhu Jan 27, 2022
944e293
add minor compaction version check
lidezhu Jan 27, 2022
60789ef
more rename
lidezhu Jan 28, 2022
b91fd4f
improve log
lidezhu Jan 28, 2022
db44d72
Merge branch 'master' into refactor-delta-tree2
lidezhu Jan 28, 2022
59f3004
Merge branch 'master' into refactor-delta-tree2
lidezhu Jan 28, 2022
e5197af
Rename and fix static analysis error
lidezhu Feb 7, 2022
ebc3259
Merge branch 'master' into refactor-delta-tree2
lidezhu Feb 7, 2022
aca8b97
Fix gtests
lidezhu Feb 7, 2022
b3dccbc
Merge branch 'master' into refactor-delta-tree2
lidezhu Feb 7, 2022
9c65453
Add more log and check for minor compaction
lidezhu Feb 8, 2022
40db952
Merge branch 'refactor-delta-tree2' of github.com:lidezhu/tics into r…
lidezhu Feb 8, 2022
44a4055
Merge branch 'master' into refactor-delta-tree2
lidezhu Feb 8, 2022
de4800c
add gtest for delta value space
lidezhu Feb 10, 2022
4fa449f
Merge branch 'master' into refactor-delta-tree2
lidezhu Feb 10, 2022
0ef3617
small improvement on code comment
lidezhu Feb 10, 2022
ec499d7
remove some debug log
lidezhu Feb 10, 2022
1b78626
Merge branch 'master' into refactor-delta-tree2
lidezhu Feb 11, 2022
8907fae
Merge branch 'master' into refactor-delta-tree2
lidezhu Feb 11, 2022
78258d4
do some more rename
lidezhu Feb 14, 2022
17bbc72
Merge branch 'master' into refactor-delta-tree2
lidezhu Feb 15, 2022
4711489
fix gtest for config rename
lidezhu Feb 15, 2022
d103a9c
Merge branch 'refactor-delta-tree2' of github.com:lidezhu/tics into r…
lidezhu Feb 15, 2022
be37f9e
Merge branch 'master' into refactor-delta-tree2
lidezhu Feb 15, 2022
5d00c12
fix DeltaValueReader::shouldPlace
lidezhu Feb 16, 2022
5e3e40c
Update dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h
lidezhu Feb 16, 2022
dc960cd
use `column_file` in config instead `pack`
lidezhu Feb 16, 2022
2806d40
Merge branch 'refactor-delta-tree2' of github.com:lidezhu/tics into r…
lidezhu Feb 16, 2022
2850b36
Merge branch 'master' into refactor-delta-tree2
lidezhu Feb 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,10 @@ struct Settings
M(SettingUInt64, dt_segment_stop_write_delta_size, 2147483648, "Delta size before stop new writes. 2 GB by default.") \
M(SettingUInt64, dt_segment_delta_cache_limit_rows, 4096, "Max rows of cache in segment delta in DeltaTree Engine.") \
M(SettingUInt64, dt_segment_delta_cache_limit_size, 4194304, "Max size of cache in segment delta in DeltaTree Engine. 4 MB by default.") \
M(SettingUInt64, dt_segment_delta_small_pack_rows, 2048, "Determine whether a pack in delta is small or not.") \
M(SettingUInt64, dt_segment_delta_small_pack_size, 8388608, "Determine whether a pack in delta is small or not. 8MB by default.") \
M(SettingUInt64, dt_segment_delta_small_pack_rows, 2048, "Deprecated. Reserved for backward compatibility. Use dt_segment_delta_small_column_file_rows instead") \
M(SettingUInt64, dt_segment_delta_small_pack_size, 8388608, "Deprecated. Reserved for backward compatibility. Use dt_segment_delta_small_column_file_size instead") \
M(SettingUInt64, dt_segment_delta_small_column_file_rows, 2048, "Determine whether a column file in delta is small or not. 8MB by default.") \
M(SettingUInt64, dt_segment_delta_small_column_file_size, 8388608, "Determine whether a column file in delta is small or not. 8MB by default.") \
M(SettingUInt64, dt_segment_stable_pack_rows, DEFAULT_MERGE_BLOCK_SIZE, "Expected stable pack rows in DeltaTree Engine.") \
M(SettingFloat, dt_segment_wait_duration_factor, 1, "The factor of wait duration in a write stall.") \
M(SettingUInt64, dt_bg_gc_check_interval, 600, "Background gc thread check interval, the unit is second.") \
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Server/tests/gtest_server_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ dt_storage_pool_data_gc_max_valid_rate = 0.5
ASSERT_EQ(global_ctx.getSettingsRef().dt_storage_pool_data_gc_min_file_num, 8);
ASSERT_EQ(global_ctx.getSettingsRef().dt_storage_pool_data_gc_min_legacy_num, 2);
ASSERT_EQ(global_ctx.getSettingsRef().dt_storage_pool_data_gc_min_bytes, 256);
ASSERT_EQ(global_ctx.getSettingsRef().dt_segment_delta_small_pack_size, 8388608);
ASSERT_EQ(global_ctx.getSettingsRef().dt_segment_delta_small_pack_rows, 2048);
ASSERT_EQ(global_ctx.getSettingsRef().dt_segment_delta_small_column_file_size, 8388608);
ASSERT_EQ(global_ctx.getSettingsRef().dt_segment_delta_small_column_file_rows, 2048);
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
ASSERT_EQ(global_ctx.getSettingsRef().dt_segment_limit_size, 536870912);
ASSERT_EQ(global_ctx.getSettingsRef().dt_segment_delta_limit_size, 42991616);
ASSERT_EQ(global_ctx.getSettingsRef().dt_segment_force_merge_delta_size, 1073741824);
Expand Down
16 changes: 10 additions & 6 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Storages/DeltaMerge/ColumnFile/ColumnFileBig.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileDeleteRange.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h>
#include <Storages/DeltaMerge/RowKeyFilter.h>

Expand Down Expand Up @@ -82,20 +83,20 @@ ColumnFileBig * ColumnFile::tryToBigFile()
return !isBigFile() ? nullptr : static_cast<ColumnFileBig *>(this);
}

String columnFilesToString(const ColumnFiles & column_files)
template <class T>
String columnFilesToString(const T & column_files)
{
String column_files_info = "[";
for (const auto & f : column_files)
{
if (f->isInMemoryFile())
column_files_info += "B_" + DB::toString(f->getRows());
column_files_info += "M_" + DB::toString(f->getRows()) + ",";
else if (f->isTinyFile())
column_files_info += "B_" + DB::toString(f->getRows());
column_files_info += "T_" + DB::toString(f->getRows()) + ",";
else if (f->isBigFile())
column_files_info += "F_" + DB::toString(f->getRows());
column_files_info += "F_" + DB::toString(f->getRows()) + ",";
else if (auto * f_delete = f->tryToDeleteRange(); f_delete)
column_files_info += "D_" + f_delete->getDeleteRange().toString();
column_files_info += (f->isSaved() ? "_S," : "_N,");
column_files_info += "D_" + f_delete->getDeleteRange().toString() + ",";
}

if (!column_files.empty())
Expand All @@ -104,5 +105,8 @@ String columnFilesToString(const ColumnFiles & column_files)
return column_files_info;
}

template String columnFilesToString<ColumnFiles>(const ColumnFiles & column_files);
template String columnFilesToString<ColumnFilePersisteds>(const ColumnFilePersisteds & column_files);

} // namespace DM
} // namespace DB
13 changes: 2 additions & 11 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,6 @@ class ColumnFile
public:
/// This id is only used to to do equal check in DeltaValueSpace::checkHeadAndCloneTail.
UInt64 getId() const { return id; }
/// This column file is already saved to disk or not. Only saved packs can be recovered after reboot.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like ColumnFile::saved is useless?

Copy link
Contributor Author

@lidezhu lidezhu Feb 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Because all column files in ColumnFilePersistedSet are saved, and column files in MemTableSet are not saved.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So let's remove it.

/// "saved" can only be true, after the content data and the metadata are all written to disk.
bool isSaved() const { return saved; }
void setSaved() { saved = true; }

virtual size_t getRows() const { return 0; }
virtual size_t getBytes() const { return 0; };
Expand Down Expand Up @@ -111,12 +107,6 @@ class ColumnFile
throw Exception("Unsupported operation", ErrorCodes::LOGICAL_ERROR);
}

/// Put the data's page id into the corresponding WriteBatch.
/// The actual remove will be done later.
virtual void removeData(WriteBatches &) const {};

virtual void serializeMetadata(WriteBuffer & buf, bool save_schema) const = 0;

virtual String toString() const = 0;
};

Expand Down Expand Up @@ -152,6 +142,7 @@ size_t copyColumnsData(


/// Debugging string
String columnFilesToString(const ColumnFiles & column_files);
template <typename T>
String columnFilesToString(const T & column_files);
} // namespace DM
} // namespace DB
26 changes: 13 additions & 13 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ void ColumnFileBig::calculateStat(const DMContext & context)
ColumnFileReaderPtr
ColumnFileBig::getReader(const DMContext & context, const StorageSnapshotPtr & /*storage_snap*/, const ColumnDefinesPtr & col_defs) const
{
return std::make_shared<ColumnBigFileReader>(context, *this, col_defs);
return std::make_shared<ColumnFileBigReader>(context, *this, col_defs);
}

void ColumnFileBig::serializeMetadata(WriteBuffer & buf, bool /*save_schema*/) const
Expand All @@ -40,9 +40,9 @@ void ColumnFileBig::serializeMetadata(WriteBuffer & buf, bool /*save_schema*/) c
writeIntBinary(valid_bytes, buf);
}

ColumnFilePtr ColumnFileBig::deserializeMetadata(DMContext & context, //
const RowKeyRange & segment_range,
ReadBuffer & buf)
ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata(DMContext & context, //
const RowKeyRange & segment_range,
ReadBuffer & buf)
{
UInt64 file_ref_id;
size_t valid_rows, valid_bytes;
Expand All @@ -56,11 +56,11 @@ ColumnFilePtr ColumnFileBig::deserializeMetadata(DMContext & context, //

auto dmfile = DMFile::restore(context.db_context.getFileProvider(), file_id, file_ref_id, file_parent_path, DMFile::ReadMetaMode::all());

auto dp_file = new ColumnFileBig(dmfile, valid_rows, valid_bytes, segment_range);
auto * dp_file = new ColumnFileBig(dmfile, valid_rows, valid_bytes, segment_range);
return std::shared_ptr<ColumnFileBig>(dp_file);
}

void ColumnBigFileReader::initStream()
void ColumnFileBigReader::initStream()
{
if (file_stream)
return;
Expand Down Expand Up @@ -94,7 +94,7 @@ void ColumnBigFileReader::initStream()
}
}

size_t ColumnBigFileReader::readRowsRepeatedly(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range)
size_t ColumnFileBigReader::readRowsRepeatedly(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range)
{
if (unlikely(rows_offset + rows_limit > column_file.valid_rows))
throw Exception("Try to read more rows", ErrorCodes::LOGICAL_ERROR);
Expand Down Expand Up @@ -126,13 +126,13 @@ size_t ColumnBigFileReader::readRowsRepeatedly(MutableColumns & output_cols, siz
return actual_read;
}

size_t ColumnBigFileReader::readRowsOnce(MutableColumns & output_cols, //
size_t ColumnFileBigReader::readRowsOnce(MutableColumns & output_cols, //
size_t rows_offset,
size_t rows_limit,
const RowKeyRange * range)
{
auto read_next_block = [&, this]() -> bool {
rows_before_cur_block += ((bool)cur_block) ? cur_block.rows() : 0;
rows_before_cur_block += (static_cast<bool>(cur_block)) ? cur_block.rows() : 0;
cur_block_data.clear();

cur_block = file_stream->read();
Expand Down Expand Up @@ -186,7 +186,7 @@ size_t ColumnBigFileReader::readRowsOnce(MutableColumns & output_cols, //
return actual_read;
}

size_t ColumnBigFileReader::readRows(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range)
size_t ColumnFileBigReader::readRows(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range)
{
initStream();

Expand All @@ -204,17 +204,17 @@ size_t ColumnBigFileReader::readRows(MutableColumns & output_cols, size_t rows_o
}
}

Block ColumnBigFileReader::readNextBlock()
Block ColumnFileBigReader::readNextBlock()
{
initStream();

return file_stream->read();
}

ColumnFileReaderPtr ColumnBigFileReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
ColumnFileReaderPtr ColumnFileBigReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
{
// Currently we don't reuse the cache data.
return std::make_shared<ColumnBigFileReader>(context, column_file, new_col_defs);
return std::make_shared<ColumnFileBigReader>(context, column_file, new_col_defs);
}

} // namespace DM
Expand Down
13 changes: 6 additions & 7 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ using ColumnBigFilePtr = std::shared_ptr<ColumnFileBig>;
/// A column file which contains a DMFile. The DMFile could have many Blocks.
class ColumnFileBig : public ColumnFilePersisted
{
friend class ColumnBigFileReader;
friend class ColumnFileBigReader;

private:
DMFilePtr file;
Expand Down Expand Up @@ -71,20 +71,19 @@ class ColumnFileBig : public ColumnFilePersisted

void serializeMetadata(WriteBuffer & buf, bool save_schema) const override;

static ColumnFilePtr deserializeMetadata(DMContext & context, //
const RowKeyRange & segment_range,
ReadBuffer & buf);
static ColumnFilePersistedPtr deserializeMetadata(DMContext & context, //
const RowKeyRange & segment_range,
ReadBuffer & buf);

String toString() const override
{
String s = "{big_file,rows:" + DB::toString(getRows()) //
+ ",bytes:" + DB::toString(getBytes()) + "}"; //
+",saved:" + DB::toString(saved) + "}"; //
return s;
}
};

class ColumnBigFileReader : public ColumnFileReader
class ColumnFileBigReader : public ColumnFileReader
{
private:
const DMContext & context;
Expand Down Expand Up @@ -113,7 +112,7 @@ class ColumnBigFileReader : public ColumnFileReader
size_t readRowsOnce(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range);

public:
ColumnBigFileReader(const DMContext & context_, const ColumnFileBig & column_file_, const ColumnDefinesPtr & col_defs_)
ColumnFileBigReader(const DMContext & context_, const ColumnFileBig & column_file_, const ColumnDefinesPtr & col_defs_)
: context(context_)
, column_file(column_file_)
, col_defs(col_defs_)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ void ColumnFileDeleteRange::serializeMetadata(WriteBuffer & buf, bool /*save_sch
delete_range.serialize(buf);
}

ColumnFilePtr ColumnFileDeleteRange::deserializeMetadata(ReadBuffer & buf)
ColumnFilePersistedPtr ColumnFileDeleteRange::deserializeMetadata(ReadBuffer & buf)
{
return std::make_shared<ColumnFileDeleteRange>(RowKeyRange::deserialize(buf));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ class ColumnFileDeleteRange : public ColumnFilePersisted

void serializeMetadata(WriteBuffer & buf, bool save_schema) const override;

static ColumnFilePtr deserializeMetadata(ReadBuffer & buf);
static ColumnFilePersistedPtr deserializeMetadata(ReadBuffer & buf);

String toString() const override { return "{delete_range:" + delete_range.toString() + ", saved: " + DB::toString(saved) + "}"; }
String toString() const override { return "{delete_range:" + delete_range.toString() + "}"; }
};

class ColumnFileEmptyReader : public ColumnFileReader
Expand Down
18 changes: 9 additions & 9 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void ColumnFileInMemory::fillColumns(const ColumnDefines & col_defs, size_t col_
ColumnFileReaderPtr
ColumnFileInMemory::getReader(const DMContext & /*context*/, const StorageSnapshotPtr & /*storage_snap*/, const ColumnDefinesPtr & col_defs) const
{
return std::make_shared<ColumnInMemoryFileReader>(*this, col_defs);
return std::make_shared<ColumnFileInMemoryReader>(*this, col_defs);
}

bool ColumnFileInMemory::append(DMContext & context, const Block & data, size_t offset, size_t limit, size_t data_bytes)
Expand All @@ -62,8 +62,8 @@ bool ColumnFileInMemory::append(DMContext & context, const Block & data, size_t

for (size_t i = 0; i < cache->block.columns(); ++i)
{
auto & col = data.getByPosition(i).column;
auto & cache_col = *cache->block.getByPosition(i).column;
const auto & col = data.getByPosition(i).column;
const auto & cache_col = *cache->block.getByPosition(i).column;
auto * mutable_cache_col = const_cast<IColumn *>(&cache_col);
mutable_cache_col->insertRangeFrom(*col, offset, limit);
}
Expand All @@ -85,27 +85,27 @@ Block ColumnFileInMemory::readDataForFlush() const
}


ColumnPtr ColumnInMemoryFileReader::getPKColumn()
ColumnPtr ColumnFileInMemoryReader::getPKColumn()
{
memory_file.fillColumns(*col_defs, 1, cols_data_cache);
return cols_data_cache[0];
}

ColumnPtr ColumnInMemoryFileReader::getVersionColumn()
ColumnPtr ColumnFileInMemoryReader::getVersionColumn()
{
memory_file.fillColumns(*col_defs, 2, cols_data_cache);
return cols_data_cache[1];
}

size_t ColumnInMemoryFileReader::readRows(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range)
size_t ColumnFileInMemoryReader::readRows(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range)
{
memory_file.fillColumns(*col_defs, output_cols.size(), cols_data_cache);

auto & pk_col = cols_data_cache[0];
return copyColumnsData(cols_data_cache, pk_col, output_cols, rows_offset, rows_limit, range);
}

Block ColumnInMemoryFileReader::readNextBlock()
Block ColumnFileInMemoryReader::readNextBlock()
{
if (read_done)
return {};
Expand All @@ -118,10 +118,10 @@ Block ColumnInMemoryFileReader::readNextBlock()
return genBlock(*col_defs, columns);
}

ColumnFileReaderPtr ColumnInMemoryFileReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
ColumnFileReaderPtr ColumnFileInMemoryReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
{
// Reuse the cache data.
return std::make_shared<ColumnInMemoryFileReader>(memory_file, new_col_defs, cols_data_cache);
return std::make_shared<ColumnFileInMemoryReader>(memory_file, new_col_defs, cols_data_cache);
}

} // namespace DM
Expand Down
13 changes: 4 additions & 9 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ using ColumnInMemoryFilePtr = std::shared_ptr<ColumnFileInMemory>;
/// A column file which is only resides in memory
class ColumnFileInMemory : public ColumnFile
{
friend class ColumnInMemoryFileReader;
friend class ColumnFileInMemoryReader;

private:
BlockPtr schema;
Expand Down Expand Up @@ -80,11 +80,6 @@ class ColumnFileInMemory : public ColumnFile

Block readDataForFlush() const;

void serializeMetadata(WriteBuffer & /*buf*/, bool /*save_schema*/) const override
{
throw Exception("Unsupported operation", ErrorCodes::LOGICAL_ERROR);
}

String toString() const override
{
String s = "{in_memory_file,rows:" + DB::toString(rows) //
Expand All @@ -97,7 +92,7 @@ class ColumnFileInMemory : public ColumnFile
};


class ColumnInMemoryFileReader : public ColumnFileReader
class ColumnFileInMemoryReader : public ColumnFileReader
{
private:
const ColumnFileInMemory & memory_file;
Expand All @@ -107,7 +102,7 @@ class ColumnInMemoryFileReader : public ColumnFileReader
bool read_done = false;

public:
ColumnInMemoryFileReader(const ColumnFileInMemory & memory_file_,
ColumnFileInMemoryReader(const ColumnFileInMemory & memory_file_,
const ColumnDefinesPtr & col_defs_,
const Columns & cols_data_cache_)
: memory_file(memory_file_)
Expand All @@ -116,7 +111,7 @@ class ColumnInMemoryFileReader : public ColumnFileReader
{
}

ColumnInMemoryFileReader(const ColumnFileInMemory & memory_file_, const ColumnDefinesPtr & col_defs_)
ColumnFileInMemoryReader(const ColumnFileInMemory & memory_file_, const ColumnDefinesPtr & col_defs_)
: memory_file(memory_file_)
, col_defs(col_defs_)
{
Expand Down
Loading